Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Add total fetch time took stat #34577

Merged
merged 5 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -67,6 +68,8 @@ public static class Request extends SingleShardRequest<Request> {
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE;

private long relativeStartNanos;

public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
this.shardId = shardId;
Expand Down Expand Up @@ -142,6 +145,9 @@ public void readFrom(StreamInput in) throws IOException {
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);

// Starting the clock in order to know how much time is spent on fetching operations:
relativeStartNanos = System.nanoTime();
}

@Override
Expand Down Expand Up @@ -220,6 +226,12 @@ public Translog.Operation[] getOperations() {
return operations;
}

private long tookInMillis;

public long getTookInMillis() {
return tookInMillis;
}

Response() {
}

Expand All @@ -228,13 +240,15 @@ public Translog.Operation[] getOperations() {
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
final Translog.Operation[] operations) {
final Translog.Operation[] operations,
final long tookInMillis) {

this.mappingVersion = mappingVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
this.operations = operations;
this.tookInMillis = tookInMillis;
}

@Override
Expand All @@ -245,6 +259,7 @@ public void readFrom(final StreamInput in) throws IOException {
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
tookInMillis = in.readVLong();
}

@Override
Expand All @@ -255,6 +270,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
out.writeArray(Translog.Operation::writeOperation, operations);
out.writeVLong(tookInMillis);
}

@Override
Expand All @@ -266,12 +282,14 @@ public boolean equals(final Object o) {
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Arrays.equals(operations, that.operations);
Arrays.equals(operations, that.operations) &&
tookInMillis == that.tookInMillis;
}

@Override
public int hashCode() {
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
Arrays.hashCode(operations), tookInMillis);
}
}

Expand Down Expand Up @@ -308,7 +326,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
}

@Override
Expand Down Expand Up @@ -373,7 +391,8 @@ private void globalCheckpointAdvancementFailure(
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
request.relativeStartNanos));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
Expand Down Expand Up @@ -459,8 +478,11 @@ static Translog.Operation[] getOperations(
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
long tookInNanos = System.nanoTime() - relativeStartNanos;
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
operations, tookInMillis);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private int numConcurrentReads = 0;
private int numConcurrentWrites = 0;
private long currentMappingVersion = 0;
private long totalFetchTookTimeMillis = 0;
private long totalFetchTimeMillis = 0;
private long numberOfSuccessfulFetches = 0;
private long numberOfFailedFetches = 0;
Expand Down Expand Up @@ -238,6 +239,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
fetchExceptions.remove(from);
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
totalFetchTookTimeMillis += response.getTookInMillis();
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
operationsReceived += response.getOperations().length;
Expand Down Expand Up @@ -449,6 +451,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
buffer.size(),
currentMappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ protected ShardChangesAction.Response createTestInstance() {
leaderGlobalCheckpoint,
leaderMaxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
operations
operations,
randomNonNegativeLong()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
final long globalCheckpoint = tracker.getCheckpoint();
final long maxSeqNo = tracker.getMaxSeqNo();
handler.accept(new ShardChangesAction.Response(
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0]));
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L));
}
};
threadPool.generic().execute(task);
Expand Down Expand Up @@ -233,7 +233,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
nextGlobalCheckPoint,
nextGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(EMPTY))
ops.toArray(EMPTY),
randomNonNegativeLong())
)
);
responses.put(prevGlobalCheckpoint, item);
Expand All @@ -256,7 +257,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
prevGlobalCheckpoint,
prevGlobalCheckpoint,
randomNonNegativeLong(),
EMPTY
EMPTY,
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
}
Expand All @@ -273,7 +275,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
localLeaderGCP,
localLeaderGCP,
randomNonNegativeLong(),
ops.toArray(EMPTY)
ops.toArray(EMPTY),
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
responses.put(fromSeqNo, Collections.unmodifiableList(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void testReceiveNothingExpectedSomething() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0]));
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L));

assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
Expand Down Expand Up @@ -782,7 +782,8 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
randomNonNegativeLong(),
operations
operations,
1L
);
handler.accept(response);
}
Expand Down Expand Up @@ -813,7 +814,8 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro
leaderGlobalCheckPoint,
leaderGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(new Translog.Operation[0])
ops.toArray(new Translog.Operation[0]),
1L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
if (from > seqNoStats.getGlobalCheckpoint()) {
handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
return;
}
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
Expand All @@ -440,7 +440,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdatesOrDeletes,
ops
ops,
1L
);
handler.accept(response);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void testToXContent() throws IOException {
final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
final long totalFetchTimeMillis = randomLongBetween(0, 4096);
final long totalFetchTookTimeMillis = randomLongBetween(0, 4096);
final long numberOfSuccessfulFetches = randomNonNegativeLong();
final long numberOfFailedFetches = randomLongBetween(0, 8);
final long operationsReceived = randomNonNegativeLong();
Expand Down Expand Up @@ -122,6 +123,7 @@ public void testToXContent() throws IOException {
numberOfQueuedWrites,
mappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
Expand Down Expand Up @@ -166,6 +168,7 @@ public void testToXContent() throws IOException {
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
+ "\"mapping_version\":" + mappingVersion + ","
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
+ "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + ","
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
+ "\"operations_received\":" + operationsReceived + ","
Expand Down Expand Up @@ -208,6 +211,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
1,
1,
100,
50,
10,
0,
10,
Expand All @@ -226,7 +230,6 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);

assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
Expand Down
Loading