diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java index 7a513bc4d5f84..452ca90205671 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java @@ -8,6 +8,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; @@ -36,23 +37,34 @@ public class TransformIndexerStats extends IndexerJobStats { public static ParseField SEARCH_TOTAL = new ParseField("search_total"); public static ParseField SEARCH_FAILURES = new ParseField("search_failures"); public static ParseField INDEX_FAILURES = new ParseField("index_failures"); - public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = - new ParseField("exponential_avg_checkpoint_duration_ms"); - public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = - new ParseField("exponential_avg_documents_indexed"); - public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = - new ParseField("exponential_avg_documents_processed"); + public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms"); + public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed"); + public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed"); // This changes how much "weight" past calculations have. // The shorter the window, the less "smoothing" will occur. private static final int EXP_AVG_WINDOW = 10; - private static final double ALPHA = 2.0/(EXP_AVG_WINDOW + 1); + private static final double ALPHA = 2.0 / (EXP_AVG_WINDOW + 1); private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( - NAME, true, - args -> new TransformIndexerStats( - (long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], - (long) args[7], (long) args[8], (long) args[9], (Double) args[10], (Double) args[11], (Double) args[12])); + NAME, + true, + args -> new TransformIndexerStats( + (long) args[0], + (long) args[1], + (long) args[2], + (long) args[3], + (long) args[4], + (long) args[5], + (long) args[6], + (long) args[7], + (long) args[8], + (long) args[9], + (Double) args[10], + (Double) args[11], + (Double) args[12] + ) + ); static { LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES); @@ -73,6 +85,7 @@ public class TransformIndexerStats extends IndexerJobStats { private double expAvgCheckpointDurationMs; private double expAvgDocumentsIndexed; private double expAvgDocumentsProcessed; + /** * Create with all stats set to zero */ @@ -80,30 +93,54 @@ public TransformIndexerStats() { super(); } - public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments, - long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal, - long indexFailures, long searchFailures, Double expAvgCheckpointDurationMs, - Double expAvgDocumentsIndexed, Double expAvgDocumentsProcessed ) { - super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, - indexFailures, searchFailures); + public TransformIndexerStats( + long numPages, + long numInputDocuments, + long numOutputDocuments, + long numInvocations, + long indexTime, + long searchTime, + long indexTotal, + long searchTotal, + long indexFailures, + long searchFailures, + Double expAvgCheckpointDurationMs, + Double expAvgDocumentsIndexed, + Double expAvgDocumentsProcessed + ) { + super( + numPages, + numInputDocuments, + numOutputDocuments, + numInvocations, + indexTime, + searchTime, + indexTotal, + searchTotal, + indexFailures, + searchFailures + ); this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs; this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed; this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed; } - public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments, - long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal, - long indexFailures, long searchFailures) { - this(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, - indexFailures, searchFailures, 0.0, 0.0, 0.0); - } - public TransformIndexerStats(TransformIndexerStats other) { - this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations, - other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures); - this.expAvgCheckpointDurationMs = other.expAvgCheckpointDurationMs; - this.expAvgDocumentsIndexed = other.expAvgDocumentsIndexed; - this.expAvgDocumentsProcessed = other.expAvgDocumentsProcessed; + this( + other.numPages, + other.numInputDocuments, + other.numOuputDocuments, + other.numInvocations, + other.indexTime, + other.searchTime, + other.indexTotal, + other.searchTotal, + other.indexFailures, + other.searchFailures, + other.expAvgCheckpointDurationMs, + other.expAvgDocumentsIndexed, + other.expAvgDocumentsProcessed + ); } public TransformIndexerStats(StreamInput in) throws IOException { @@ -180,7 +217,7 @@ public void incrementCheckpointExponentialAverages(long checkpointDurationMs, lo } private double calculateExpAvg(double previousExpValue, double alpha, long observedValue) { - return alpha * observedValue + (1-alpha) * previousExpValue; + return alpha * observedValue + (1 - alpha) * previousExpValue; } @Override @@ -212,9 +249,26 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations, - indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal, - expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed); + return Objects.hash( + numPages, + numInputDocuments, + numOuputDocuments, + numInvocations, + indexTime, + searchTime, + indexFailures, + searchFailures, + indexTotal, + searchTotal, + expAvgCheckpointDurationMs, + expAvgDocumentsIndexed, + expAvgDocumentsProcessed + ); + } + + @Override + public String toString() { + return Strings.toString(this); } public static TransformIndexerStats fromXContent(XContentParser parser) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java index 05866c9c0b1b7..f83f820e19433 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java @@ -31,13 +31,21 @@ protected TransformIndexerStats doParseInstance(XContentParser parser) { } public static TransformIndexerStats randomStats() { - return new TransformIndexerStats(randomLongBetween(10L, 10000L), - randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), - randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), + return new TransformIndexerStats( + randomLongBetween(10L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), + randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomBoolean() ? randomDouble() : null, randomBoolean() ? randomDouble() : null, - randomBoolean() ? randomDouble() : null); + randomBoolean() ? randomDouble() : null + ); } public void testExpAvgIncrement() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index 45b32804ccda3..ee4bd9a47b71b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -69,7 +69,7 @@ public void testBwcWith73() throws IOException { STARTED, randomBoolean() ? null : randomAlphaOfLength(100), randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(), - new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0.0, 0.0, 0.0), new TransformCheckpointingInfo( new TransformCheckpointStats(0, null, null, 10, 100), new TransformCheckpointStats(0, null, null, 100, 1000), diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml index fe38576309186..cdf6d4631238b 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml @@ -247,7 +247,6 @@ teardown: - gte: { transforms.0.stats.search_time_in_ms: 0 } - lte: { transforms.0.stats.search_total: 1 } - match: { transforms.0.stats.search_failures: 0 } - - match: { transforms.0.stats.exponential_avg_checkpoint_duration_ms: 0.0 } - match: { transforms.0.stats.exponential_avg_documents_indexed: 0.0 } - match: { transforms.0.stats.exponential_avg_documents_processed: 0.0 } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java index 8c80616fcd7eb..dfeb203d3ca85 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java @@ -29,11 +29,9 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase { private static final String TEST_USER_NAME = "transform_user"; - private static final String BASIC_AUTH_VALUE_TRANSFORM_USER = - basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING); + private static final String BASIC_AUTH_VALUE_TRANSFORM_USER = basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING); private static final String TEST_ADMIN_USER_NAME = "transform_admin"; - private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN = - basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING); + private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN = basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING); private static boolean indicesCreated = false; @@ -101,13 +99,13 @@ public void testGetAndGetStats() throws Exception { stats = entityAsMap(client().performRequest(getRequest)); assertEquals(3, XContentMapValues.extractValue("count", stats)); - List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + List> transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); // Verify that both transforms have valid stats for (Map transformStats : transformsStats) { - Map stat = (Map)transformStats.get("stats"); - assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0)); - assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0)); - assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0)); + Map stat = (Map) transformStats.get("stats"); + assertThat("documents_processed is not > 0.", ((Integer) stat.get("documents_processed")), greaterThan(0)); + assertThat("search_total is not > 0.", ((Integer) stat.get("search_total")), greaterThan(0)); + assertThat("pages_processed is not > 0.", ((Integer) stat.get("pages_processed")), greaterThan(0)); /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here Map progress = (Map)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats); @@ -122,7 +120,7 @@ public void testGetAndGetStats() throws Exception { stats = entityAsMap(client().performRequest(getRequest)); assertEquals(1, XContentMapValues.extractValue("count", stats)); - transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); assertEquals("stopped", XContentMapValues.extractValue("state", transformsStats.get(0))); assertNull(XContentMapValues.extractValue("checkpointing.next.position", transformsStats.get(0))); @@ -133,12 +131,11 @@ public void testGetAndGetStats() throws Exception { stats = entityAsMap(client().performRequest(getRequest)); assertEquals(1, XContentMapValues.extractValue("count", stats)); - transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); assertThat(XContentMapValues.extractValue("state", transformsStats.get(0)), oneOf("started", "indexing")); assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0))); - // check all the different ways to retrieve all transforms getRequest = createRequestWithAuth("GET", getTransformEndpoint(), authHeader); Map transforms = entityAsMap(client().performRequest(getRequest)); @@ -165,12 +162,13 @@ public void testGetPersistedStatsWithoutTask() throws Exception { stopTransform("pivot_stats_1", false); // Get rid of the first transform task, but keep the configuration - client().performRequest(new Request("POST", "_tasks/_cancel?actions="+TransformField.TASK_NAME+"*")); + client().performRequest(new Request("POST", "_tasks/_cancel?actions=" + TransformField.TASK_NAME + "*")); // Verify that the task is gone - Map tasks = - entityAsMap(client().performRequest(new Request("GET", "_tasks?actions="+TransformField.TASK_NAME+"*"))); - assertTrue(((Map)XContentMapValues.extractValue("nodes", tasks)).isEmpty()); + Map tasks = entityAsMap( + client().performRequest(new Request("GET", "_tasks?actions=" + TransformField.TASK_NAME + "*")) + ); + assertTrue(((Map) XContentMapValues.extractValue("nodes", tasks)).isEmpty()); createPivotReviewsTransform("pivot_stats_2", "pivot_reviews_stats_2", null); startAndWaitForTransform("pivot_stats_2", "pivot_reviews_stats_2"); @@ -178,13 +176,13 @@ public void testGetPersistedStatsWithoutTask() throws Exception { Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + "_stats", BASIC_AUTH_VALUE_TRANSFORM_ADMIN); Map stats = entityAsMap(client().performRequest(getRequest)); assertEquals(2, XContentMapValues.extractValue("count", stats)); - List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + List> transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); // Verify that both transforms, the one with the task and the one without have statistics for (Map transformStats : transformsStats) { - Map stat = (Map)transformStats.get("stats"); - assertThat(((Integer)stat.get("documents_processed")), greaterThan(0)); - assertThat(((Integer)stat.get("search_total")), greaterThan(0)); - assertThat(((Integer)stat.get("pages_processed")), greaterThan(0)); + Map stat = (Map) transformStats.get("stats"); + assertThat(((Integer) stat.get("documents_processed")), greaterThan(0)); + assertThat(((Integer) stat.get("search_total")), greaterThan(0)); + assertThat(((Integer) stat.get("pages_processed")), greaterThan(0)); } } @@ -202,13 +200,13 @@ public void testGetProgressStatsWithPivotQuery() throws Exception { Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId + "/_stats", authHeader); Map stats = entityAsMap(client().performRequest(getRequest)); assertEquals(1, XContentMapValues.extractValue("count", stats)); - List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + List> transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); // Verify that the transform has stats and the total docs process matches the expected for (Map transformStats : transformsStats) { - Map stat = (Map)transformStats.get("stats"); - assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0)); - assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0)); - assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0)); + Map stat = (Map) transformStats.get("stats"); + assertThat("documents_processed is not > 0.", ((Integer) stat.get("documents_processed")), greaterThan(0)); + assertThat("search_total is not > 0.", ((Integer) stat.get("search_total")), greaterThan(0)); + assertThat("pages_processed is not > 0.", ((Integer) stat.get("pages_processed")), greaterThan(0)); /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here Map progress = (Map)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats); @@ -226,8 +224,12 @@ public void testGetStatsWithContinuous() throws Exception { String transformSrc = "reviews_cont_pivot_test"; createReviewsIndex(transformSrc); final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); - String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"}," - + " \"source\": {\"index\":\"" + transformSrc + "\"}," + String config = "{ \"dest\": {\"index\":\"" + + transformDest + + "\"}," + + " \"source\": {\"index\":\"" + + transformSrc + + "\"}," + " \"frequency\": \"1s\"," + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" @@ -251,20 +253,28 @@ public void testGetStatsWithContinuous() throws Exception { Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId + "/_stats", null); Map stats = entityAsMap(client().performRequest(getRequest)); - List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + List> transformsStats = (List>) XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); - // No continuous checkpoints have been seen and thus all exponential averages should be 0.0 + // No continuous checkpoints have been seen and thus all exponential averages should be equal to the batch stats for (Map transformStats : transformsStats) { - transformStats = (Map)transformStats.get("stats"); - assertThat("exponential_avg_checkpoint_duration_ms is not 0.0", - transformStats.get("exponential_avg_checkpoint_duration_ms"), - equalTo(0.0)); - assertThat("exponential_avg_documents_indexed is not 0.0", - transformStats.get("exponential_avg_documents_indexed"), - equalTo(0.0)); - assertThat("exponential_avg_documents_processed is not 0.0", + transformStats = (Map) transformStats.get("stats"); + assertThat(transformStats.get("documents_processed"), equalTo(1000)); + assertThat(transformStats.get("documents_indexed"), equalTo(27)); + assertThat( + "exponential_avg_checkpoint_duration_ms is not 0.0", + (Double) transformStats.get("exponential_avg_checkpoint_duration_ms"), + greaterThan(0.0) + ); + assertThat( + "exponential_avg_documents_indexed does not match documents_indexed", + (Double) transformStats.get("exponential_avg_documents_indexed"), + equalTo(((Integer) transformStats.get("documents_indexed")).doubleValue()) + ); + assertThat( + "exponential_avg_documents_processed does not match documents_processed", transformStats.get("exponential_avg_documents_processed"), - equalTo(0.0)); + equalTo(((Integer) transformStats.get("documents_processed")).doubleValue()) + ); } int numDocs = 10; @@ -296,23 +306,27 @@ public void testGetStatsWithContinuous() throws Exception { // We should now have exp avgs since we have processed a continuous checkpoint assertBusy(() -> { Map statsResponse = entityAsMap(client().performRequest(getRequest)); - List> contStats = (List>)XContentMapValues.extractValue("transforms", statsResponse); + List> contStats = (List>) XContentMapValues.extractValue("transforms", statsResponse); assertEquals(1, contStats.size()); for (Map transformStats : contStats) { - Map statsObj = (Map)transformStats.get("stats"); - assertThat("exponential_avg_checkpoint_duration_ms is 0", - (Double)statsObj.get("exponential_avg_checkpoint_duration_ms"), - greaterThan(0.0)); - assertThat("exponential_avg_documents_indexed is 0", - (Double)statsObj.get("exponential_avg_documents_indexed"), - greaterThan(0.0)); - assertThat("exponential_avg_documents_processed is 0", - (Double)statsObj.get("exponential_avg_documents_processed"), - greaterThan(0.0)); - Map checkpointing = (Map)transformStats.get("checkpointing"); - assertThat("changes_last_detected_at is null", - checkpointing.get("changes_last_detected_at"), - is(notNullValue())); + Map statsObj = (Map) transformStats.get("stats"); + assertThat( + "exponential_avg_checkpoint_duration_ms is 0", + (Double) statsObj.get("exponential_avg_checkpoint_duration_ms"), + greaterThan(0.0) + ); + assertThat( + "exponential_avg_documents_indexed is 0", + (Double) statsObj.get("exponential_avg_documents_indexed"), + greaterThan(0.0) + ); + assertThat( + "exponential_avg_documents_processed is 0", + (Double) statsObj.get("exponential_avg_documents_processed"), + greaterThan(0.0) + ); + Map checkpointing = (Map) transformStats.get("checkpointing"); + assertThat("changes_last_detected_at is null", checkpointing.get("changes_last_detected_at"), is(notNullValue())); } }, 120, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java index 1bc5198144889..ede378b3b3ec2 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java @@ -70,13 +70,21 @@ public void testUsage() throws Exception { Request getRequest = new Request("GET", getTransformEndpoint() + "test_usage/_stats"); Map stats = entityAsMap(client().performRequest(getRequest)); - Map expectedStats = new HashMap<>(); + Map expectedStats = new HashMap<>(); for (String statName : PROVIDED_STATS) { @SuppressWarnings("unchecked") - List specificStatistic = ((List) XContentMapValues.extractValue("transforms.stats." + statName, stats)); + List specificStatistic = (List) (XContentMapValues.extractValue("transforms.stats." + statName, stats)); assertNotNull(specificStatistic); - Integer statistic = (specificStatistic).get(0); - expectedStats.put(statName, statistic); + expectedStats.put(statName, extractStatsAsDouble(specificStatistic.get(0))); + } + + getRequest = new Request("GET", getTransformEndpoint() + "test_usage_continuous/_stats"); + stats = entityAsMap(client().performRequest(getRequest)); + for (String statName : PROVIDED_STATS) { + @SuppressWarnings("unchecked") + List specificStatistic = (List) (XContentMapValues.extractValue("transforms.stats." + statName, stats)); + assertNotNull(specificStatistic); + expectedStats.compute(statName, (key, value) -> value + extractStatsAsDouble(specificStatistic.get(0))); } // Simply because we wait for continuous to reach checkpoint 1, does not mean that the statistics are written yet. @@ -96,8 +104,9 @@ public void testUsage() throws Exception { } assertEquals( "Incorrect stat " + statName, - expectedStats.get(statName) * 2, - XContentMapValues.extractValue("transform.stats." + statName, statsMap) + expectedStats.get(statName).doubleValue(), + extractStatsAsDouble(XContentMapValues.extractValue("transform.stats." + statName, statsMap)), + 0.0001 ); } // Refresh the index so that statistics are searchable @@ -112,4 +121,14 @@ public void testUsage() throws Exception { assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", usageAsMap)); assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap)); } + + private double extractStatsAsDouble(Object statsObject) { + if (statsObject instanceof Integer) { + return ((Integer) statsObject).doubleValue(); + } else if (statsObject instanceof Double) { + return (Double) statsObject; + } + fail("unexpected value type for stats"); + return 0; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformFeatureSet.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformFeatureSet.java index 7900fffb41fdc..fba806a3970b6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformFeatureSet.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformFeatureSet.java @@ -36,8 +36,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; -import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import java.util.ArrayList; import java.util.Arrays; @@ -67,7 +67,10 @@ public class TransformFeatureSet implements XPackFeatureSet { TransformIndexerStats.INDEX_TOTAL.getPreferredName(), TransformIndexerStats.SEARCH_TOTAL.getPreferredName(), TransformIndexerStats.INDEX_FAILURES.getPreferredName(), - TransformIndexerStats.SEARCH_FAILURES.getPreferredName(), }; + TransformIndexerStats.SEARCH_FAILURES.getPreferredName(), + TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), + TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), + TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), }; @Inject public TransformFeatureSet(Settings settings, ClusterService clusterService, Client client, @Nullable XPackLicenseState licenseState) { @@ -166,29 +169,32 @@ public void usage(ActionListener listener) { } static TransformIndexerStats parseSearchAggs(SearchResponse searchResponse) { - List statisticsList = new ArrayList<>(PROVIDED_STATS.length); + List statisticsList = new ArrayList<>(PROVIDED_STATS.length); for (String statName : PROVIDED_STATS) { Aggregation agg = searchResponse.getAggregations().get(statName); if (agg instanceof NumericMetricsAggregation.SingleValue) { - statisticsList.add((long) ((NumericMetricsAggregation.SingleValue) agg).value()); + statisticsList.add(((NumericMetricsAggregation.SingleValue) agg).value()); } else { - statisticsList.add(0L); + statisticsList.add(0.0); } } return new TransformIndexerStats( - statisticsList.get(0), // numPages - statisticsList.get(1), // numInputDocuments - statisticsList.get(2), // numOutputDocuments - statisticsList.get(3), // numInvocations - statisticsList.get(4), // indexTime - statisticsList.get(5), // searchTime - statisticsList.get(6), // indexTotal - statisticsList.get(7), // searchTotal - statisticsList.get(8), // indexFailures - statisticsList.get(9) - ); // searchFailures + statisticsList.get(0).longValue(), // numPages + statisticsList.get(1).longValue(), // numInputDocuments + statisticsList.get(2).longValue(), // numOutputDocuments + statisticsList.get(3).longValue(), // numInvocations + statisticsList.get(4).longValue(), // indexTime + statisticsList.get(5).longValue(), // searchTime + statisticsList.get(6).longValue(), // indexTotal + statisticsList.get(7).longValue(), // searchTotal + statisticsList.get(8).longValue(), // indexFailures + statisticsList.get(9).longValue(), // searchFailures + statisticsList.get(10), // exponential_avg_checkpoint_duration_ms + statisticsList.get(11), // exponential_avg_documents_indexed + statisticsList.get(12) // exponential_avg_documents_processed + ); } static void getStatisticSummations(Client client, ActionListener statsListener) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 6f1925bdc8172..60afee087fb0c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -359,9 +359,8 @@ protected void onFinish(ActionListener listener) { if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); } - // If the last checkpoint is now greater than 1, that means that we have just processed the first - // continuous checkpoint and should start recording the exponential averages - if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) { + + if (lastCheckpoint != null) { long docsIndexed = 0; long docsProcessed = 0; // This should not happen as we simply create a new one when we reach continuous checkpoints diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformFeatureSetTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformFeatureSetTests.java index 68e8e43916e8e..b34b9d803a407 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformFeatureSetTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformFeatureSetTests.java @@ -135,8 +135,11 @@ public void testParseSearchAggs() { 7, // indexTotal 8, // searchTotal 9, // indexFailures - 10 - ); // searchFailures + 10, // searchFailures + 11.0, // exponential_avg_checkpoint_duration_ms + 12.0, // exponential_avg_documents_indexed + 13.0 // exponential_avg_documents_processed + ); int currentStat = 1; List aggs = new ArrayList<>(PROVIDED_STATS.length);