diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 4c82c90b2d544..611d31978692c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -67,6 +68,8 @@ public static class Request extends SingleShardRequest { private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT; private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE; + private long relativeStartNanos; + public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); this.shardId = shardId; @@ -142,6 +145,9 @@ public void readFrom(StreamInput in) throws IOException { expectedHistoryUUID = in.readString(); pollTimeout = in.readTimeValue(); maxBatchSize = new ByteSizeValue(in); + + // Starting the clock in order to know how much time is spent on fetching operations: + relativeStartNanos = System.nanoTime(); } @Override @@ -220,6 +226,12 @@ public Translog.Operation[] getOperations() { return operations; } + private long tookInMillis; + + public long getTookInMillis() { + return tookInMillis; + } + Response() { } @@ -228,13 +240,15 @@ public Translog.Operation[] getOperations() { final long globalCheckpoint, final long maxSeqNo, final long maxSeqNoOfUpdatesOrDeletes, - final Translog.Operation[] operations) { + final Translog.Operation[] operations, + final long tookInMillis) { this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; this.operations = operations; + this.tookInMillis = tookInMillis; } @Override @@ -245,6 +259,7 @@ public void readFrom(final StreamInput in) throws IOException { maxSeqNo = in.readZLong(); maxSeqNoOfUpdatesOrDeletes = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + tookInMillis = in.readVLong(); } @Override @@ -255,6 +270,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeZLong(maxSeqNo); out.writeZLong(maxSeqNoOfUpdatesOrDeletes); out.writeArray(Translog.Operation::writeOperation, operations); + out.writeVLong(tookInMillis); } @Override @@ -266,12 +282,14 @@ public boolean equals(final Object o) { globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && - Arrays.equals(operations, that.operations); + Arrays.equals(operations, that.operations) && + tookInMillis == that.tookInMillis; } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, + Arrays.hashCode(operations), tookInMillis); } } @@ -308,7 +326,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.getMaxBatchSize()); // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations); + return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos); } @Override @@ -373,7 +391,8 @@ private void globalCheckpointAdvancementFailure( clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY)); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY, + request.relativeStartNanos)); } catch (final Exception caught) { caught.addSuppressed(e); listener.onFailure(caught); @@ -459,8 +478,11 @@ static Translog.Operation[] getOperations( } static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, - final long maxSeqNoOfUpdates, final Translog.Operation[] operations) { - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations); + final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) { + long tookInNanos = System.nanoTime() - relativeStartNanos; + long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, + operations, tookInMillis); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 55d246fea4b31..b156a41896a10 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -71,6 +71,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private int numConcurrentReads = 0; private int numConcurrentWrites = 0; private long currentMappingVersion = 0; + private long totalFetchTookTimeMillis = 0; private long totalFetchTimeMillis = 0; private long numberOfSuccessfulFetches = 0; private long numberOfFailedFetches = 0; @@ -238,6 +239,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR fetchExceptions.remove(from); if (response.getOperations().length > 0) { // do not count polls against fetch stats + totalFetchTookTimeMillis += response.getTookInMillis(); totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfSuccessfulFetches++; operationsReceived += response.getOperations().length; @@ -449,6 +451,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { buffer.size(), currentMappingVersion, totalFetchTimeMillis, + totalFetchTookTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index a99e930188cf0..b9ac4fee3d23d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -26,7 +26,8 @@ protected ShardChangesAction.Response createTestInstance() { leaderGlobalCheckpoint, leaderMaxSeqNo, maxSeqNoOfUpdatesOrDeletes, - operations + operations, + randomNonNegativeLong() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index d4b2d630966e8..50c0dd9ca49a0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -158,7 +158,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); handler.accept(new ShardChangesAction.Response( - 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0])); + 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L)); } }; threadPool.generic().execute(task); @@ -233,7 +233,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, randomNonNegativeLong(), - ops.toArray(EMPTY)) + ops.toArray(EMPTY), + randomNonNegativeLong()) ) ); responses.put(prevGlobalCheckpoint, item); @@ -256,7 +257,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, randomNonNegativeLong(), - EMPTY + EMPTY, + randomNonNegativeLong() ); item.add(new TestResponse(null, mappingVersion, response)); } @@ -273,7 +275,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, localLeaderGCP, localLeaderGCP, randomNonNegativeLong(), - ops.toArray(EMPTY) + ops.toArray(EMPTY), + randomNonNegativeLong() ); item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 9d6112350e4c4..6bd5136e4be56 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -56,6 +56,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index b221a79e69efa..1988513c95d3b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -439,7 +439,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L)); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -782,7 +782,8 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), randomNonNegativeLong(), - operations + operations, + 1L ); handler.accept(response); } @@ -813,7 +814,8 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro leaderGlobalCheckPoint, leaderGlobalCheckPoint, randomNonNegativeLong(), - ops.toArray(new Translog.Operation[0]) + ops.toArray(new Translog.Operation[0]), + 1L ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index d5594bcabb7a2..96bc2f04f5920 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -429,7 +429,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, - maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); + maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L)); return; } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, @@ -440,7 +440,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdatesOrDeletes, - ops + ops, + 1L ); handler.accept(response); return; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 81f862e5cf0d9..c93da38666e22 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -49,6 +49,7 @@ protected FollowStatsAction.StatsResponses createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index f96032d218121..219bf7187baad 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -94,6 +94,7 @@ public void testToXContent() throws IOException { final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE); final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE); final long totalFetchTimeMillis = randomLongBetween(0, 4096); + final long totalFetchTookTimeMillis = randomLongBetween(0, 4096); final long numberOfSuccessfulFetches = randomNonNegativeLong(); final long numberOfFailedFetches = randomLongBetween(0, 8); final long operationsReceived = randomNonNegativeLong(); @@ -122,6 +123,7 @@ public void testToXContent() throws IOException { numberOfQueuedWrites, mappingVersion, totalFetchTimeMillis, + totalFetchTookTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, @@ -166,6 +168,7 @@ public void testToXContent() throws IOException { + "\"number_of_queued_writes\":" + numberOfQueuedWrites + "," + "\"mapping_version\":" + mappingVersion + "," + "\"total_fetch_time_millis\":" + totalFetchTimeMillis + "," + + "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + "," + "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + "," + "\"number_of_failed_fetches\":" + numberOfFailedFetches + "," + "\"operations_received\":" + operationsReceived + "," @@ -208,6 +211,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { 1, 1, 100, + 50, 10, 0, 10, @@ -226,7 +230,6 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { Map template = XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false); Map followStatsMapping = (Map) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template); - assertThat(serializedStatus.size(), equalTo(followStatsMapping.size())); for (Map.Entry entry : serializedStatus.entrySet()) { String fieldName = entry.getKey(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index f002fb44c14d7..e21729df58b54 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -48,6 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); + private static final ParseField TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD = new ParseField("total_fetch_leader_time_millis"); private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches"); private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches"); private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received"); @@ -87,12 +88,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[19], (long) args[20], (long) args[21], + (long) args[22], new TreeMap<>( - ((List>>) args[22]) + ((List>>) args[23]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[23], - (ElasticsearchException) args[24])); + (long) args[24], + (ElasticsearchException) args[25])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -116,6 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); @@ -228,6 +231,12 @@ public long totalFetchTimeMillis() { return totalFetchTimeMillis; } + private final long totalFetchLeaderTimeMillis; + + public long totalFetchLeaderTimeMillis() { + return totalFetchLeaderTimeMillis; + } + private final long numberOfSuccessfulFetches; public long numberOfSuccessfulFetches() { @@ -309,6 +318,7 @@ public ShardFollowNodeTaskStatus( final int numberOfQueuedWrites, final long mappingVersion, final long totalFetchTimeMillis, + final long totalFetchLeaderTimeMillis, final long numberOfSuccessfulFetches, final long numberOfFailedFetches, final long operationsReceived, @@ -334,6 +344,7 @@ public ShardFollowNodeTaskStatus( this.numberOfQueuedWrites = numberOfQueuedWrites; this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; + this.totalFetchLeaderTimeMillis = totalFetchLeaderTimeMillis; this.numberOfSuccessfulFetches = numberOfSuccessfulFetches; this.numberOfFailedFetches = numberOfFailedFetches; this.operationsReceived = operationsReceived; @@ -362,6 +373,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.numberOfQueuedWrites = in.readVInt(); this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); + this.totalFetchLeaderTimeMillis = in.readVLong(); this.numberOfSuccessfulFetches = in.readVLong(); this.numberOfFailedFetches = in.readVLong(); this.operationsReceived = in.readVLong(); @@ -397,6 +409,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(numberOfQueuedWrites); out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); + out.writeVLong(totalFetchLeaderTimeMillis); out.writeVLong(numberOfSuccessfulFetches); out.writeVLong(numberOfFailedFetches); out.writeVLong(operationsReceived); @@ -444,6 +457,10 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), "total_fetch_time", new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS)); + builder.humanReadableField( + TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD.getPreferredName(), + "total_fetch_leader_time", + new TimeValue(totalFetchLeaderTimeMillis, TimeUnit.MILLISECONDS)); builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches); builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches); builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived); @@ -516,6 +533,7 @@ public boolean equals(final Object o) { numberOfQueuedWrites == that.numberOfQueuedWrites && mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && + totalFetchLeaderTimeMillis == that.totalFetchLeaderTimeMillis && numberOfSuccessfulFetches == that.numberOfSuccessfulFetches && numberOfFailedFetches == that.numberOfFailedFetches && operationsReceived == that.operationsReceived && @@ -552,6 +570,7 @@ public int hashCode() { numberOfQueuedWrites, mappingVersion, totalFetchTimeMillis, + totalFetchLeaderTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, operationsReceived, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index d55cdd690be09..791a0ea02c392 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -971,6 +971,9 @@ "total_fetch_time_millis": { "type": "long" }, + "total_fetch_leader_time_millis": { + "type": "long" + }, "number_of_successful_fetches": { "type": "long" },