Skip to content

Commit

Permalink
Track in-progress indexing operations (#65996)
Browse files Browse the repository at this point in the history
Currently, our indexing pressure metrics only track the number of "bytes"
related to in-progress indexing requests. This commit introduces tracking of
the number of operations. Building out this metrics will add more information
for future monitoring and back-pressure.
  • Loading branch information
Tim-Brooks authored Dec 8, 2020
1 parent 524f39f commit c76058d
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 128 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected int primaryOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
Expand Down Expand Up @@ -439,6 +444,11 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected int replicaOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ protected long primaryOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}

@Override
protected int primaryOperationCount(ResyncReplicationRequest request) {
return request.getOperations().length;
}

public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
return request;
}
Expand All @@ -131,6 +136,11 @@ protected long replicaOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}

@Override
protected int replicaOperationCount(ResyncReplicationRequest request) {
return request.getOperations().length;
}

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected String executor(IndexShard shard) {

@Override
protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), force(request));
}

protected boolean force(ReplicatedWriteRequest<?> request) {
Expand All @@ -106,31 +106,41 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationCount(request),
primaryOperationSize(request));
} else {
return () -> {};
}
} else {
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request),
force(request));
}
}

protected long primaryOperationSize(Request request) {
return 0;
}

protected int primaryOperationCount(Request request) {
return 0;
}

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
return indexingPressure.markReplicaOperationStarted(replicaOperationCount(request), replicaOperationSize(request), force(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
return 0;
}

protected int replicaOperationCount(ReplicaRequest request) {
return 0;
}

/** Syncs operation result to the translog or throws a shard not available failure */
protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Location currentLocation) throws Exception {
Expand Down
55 changes: 32 additions & 23 deletions server/src/main/java/org/elasticsearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,19 @@ public class IndexingPressure {
private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentReplicaBytes = new AtomicLong(0);

private final AtomicLong currentCoordinatingOps = new AtomicLong(0);
private final AtomicLong currentPrimaryOps = new AtomicLong(0);
private final AtomicLong currentReplicaOps = new AtomicLong(0);

private final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalCoordinatingBytes = new AtomicLong(0);
private final AtomicLong totalPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalReplicaBytes = new AtomicLong(0);

private final AtomicLong totalCoordinatingOps = new AtomicLong(0);
private final AtomicLong totalPrimaryOps = new AtomicLong(0);
private final AtomicLong totalReplicaOps = new AtomicLong(0);

private final AtomicLong coordinatingRejections = new AtomicLong(0);
private final AtomicLong primaryRejections = new AtomicLong(0);
private final AtomicLong replicaRejections = new AtomicLong(0);
Expand All @@ -55,7 +63,7 @@ public IndexingPressure(Settings settings) {
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = combinedBytes + replicaWriteBytes;
Expand All @@ -72,21 +80,29 @@ public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExec
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
currentCoordinatingBytes.getAndAdd(bytes);
currentCoordinatingOps.getAndAdd(operations);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
totalCoordinatingOps.getAndAdd(operations);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentCoordinatingBytes.getAndAdd(-bytes);
this.currentCoordinatingOps.getAndAdd(-operations);
};
}

public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) {
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) {
currentPrimaryBytes.getAndAdd(bytes);
currentPrimaryOps.getAndAdd(operations);
totalPrimaryBytes.getAndAdd(bytes);
return () -> this.currentPrimaryBytes.getAndAdd(-bytes);
totalPrimaryOps.getAndAdd(operations);
return () -> {
this.currentPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryOps.getAndAdd(-operations);
};
}

public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) {
public Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = combinedBytes + replicaWriteBytes;
Expand All @@ -103,15 +119,18 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
currentPrimaryBytes.getAndAdd(bytes);
currentPrimaryOps.getAndAdd(operations);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
totalPrimaryOps.getAndAdd(operations);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryOps.getAndAdd(-operations);
};
}

public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
public Releasable markReplicaOperationStarted(int operations, long bytes, boolean forceExecution) {
long replicaWriteBytes = this.currentReplicaBytes.addAndGet(bytes);
if (forceExecution == false && replicaWriteBytes > replicaLimits) {
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
Expand All @@ -122,30 +141,20 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution
"replica_operation_bytes=" + bytes + ", " +
"max_replica_bytes=" + replicaLimits + "]", false);
}
currentReplicaOps.getAndAdd(operations);
totalReplicaBytes.getAndAdd(bytes);
return () -> this.currentReplicaBytes.getAndAdd(-bytes);
}

public long getCurrentCombinedCoordinatingAndPrimaryBytes() {
return currentCombinedCoordinatingAndPrimaryBytes.get();
}

public long getCurrentCoordinatingBytes() {
return currentCoordinatingBytes.get();
}

public long getCurrentPrimaryBytes() {
return currentPrimaryBytes.get();
}

public long getCurrentReplicaBytes() {
return currentReplicaBytes.get();
totalReplicaOps.getAndAdd(operations);
return () -> {
this.currentReplicaBytes.getAndAdd(-bytes);
this.currentReplicaOps.getAndAdd(-operations);
};
}

public IndexingPressureStats stats() {
return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(),
totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(),
currentCoordinatingBytes.get(), currentPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingRejections.get(),
primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits);
primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits, totalCoordinatingOps.get(),
totalPrimaryOps.get(), totalReplicaOps.get(), currentCoordinatingOps.get(), currentPrimaryOps.get(), currentReplicaOps.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
private final long replicaRejections;
private final long memoryLimit;

// These fields will be used for additional back-pressure and metrics in the future
private final long totalCoordinatingOps;
private final long totalPrimaryOps;
private final long totalReplicaOps;
private final long currentCoordinatingOps;
private final long currentPrimaryOps;
private final long currentReplicaOps;

public IndexingPressureStats(StreamInput in) throws IOException {
totalCombinedCoordinatingAndPrimaryBytes = in.readVLong();
totalCoordinatingBytes = in.readVLong();
Expand All @@ -66,12 +74,21 @@ public IndexingPressureStats(StreamInput in) throws IOException {
} else {
memoryLimit = -1L;
}

// These are not currently propagated across the network yet
this.totalCoordinatingOps = 0;
this.totalPrimaryOps = 0;
this.totalReplicaOps = 0;
this.currentCoordinatingOps = 0;
this.currentPrimaryOps = 0;
this.currentReplicaOps = 0;
}

public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes,
long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes,
long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections,
long replicaRejections, long memoryLimit) {
long replicaRejections, long memoryLimit, long totalCoordinatingOps, long totalPrimaryOps,
long totalReplicaOps, long currentCoordinatingOps, long currentPrimaryOps, long currentReplicaOps) {
this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
this.totalCoordinatingBytes = totalCoordinatingBytes;
this.totalPrimaryBytes = totalPrimaryBytes;
Expand All @@ -84,6 +101,13 @@ public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long
this.primaryRejections = primaryRejections;
this.replicaRejections = replicaRejections;
this.memoryLimit = memoryLimit;

this.totalCoordinatingOps = totalCoordinatingOps;
this.totalPrimaryOps = totalPrimaryOps;
this.totalReplicaOps = totalReplicaOps;
this.currentCoordinatingOps = currentCoordinatingOps;
this.currentPrimaryOps = currentPrimaryOps;
this.currentReplicaOps = currentReplicaOps;
}

@Override
Expand Down Expand Up @@ -151,6 +175,30 @@ public long getReplicaRejections() {
return replicaRejections;
}

public long getTotalCoordinatingOps() {
return totalCoordinatingOps;
}

public long getTotalPrimaryOps() {
return totalPrimaryOps;
}

public long getTotalReplicaOps() {
return totalReplicaOps;
}

public long getCurrentCoordinatingOps() {
return currentCoordinatingOps;
}

public long getCurrentPrimaryOps() {
return currentPrimaryOps;
}

public long getCurrentReplicaOps() {
return currentReplicaOps;
}

private static final String COMBINED = "combined_coordinating_and_primary";
private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
private static final String COORDINATING = "coordinating";
Expand Down
Loading

0 comments on commit c76058d

Please sign in to comment.