Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track in-progress indexing operations #65996

Merged
merged 9 commits into from
Dec 8, 2020

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected int primaryOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
Expand Down Expand Up @@ -439,6 +444,11 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

@Override
protected int replicaOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ protected long primaryOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}

@Override
protected int primaryOperationCount(ResyncReplicationRequest request) {
return request.getOperations().length;
}

public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
return request;
}
Expand All @@ -131,6 +136,11 @@ protected long replicaOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}

@Override
protected int replicaOperationCount(ResyncReplicationRequest request) {
return request.getOperations().length;
}

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected String executor(IndexShard shard) {

@Override
protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), force(request));
}

protected boolean force(ReplicatedWriteRequest<?> request) {
Expand All @@ -106,31 +106,41 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationCount(request),
primaryOperationSize(request));
} else {
return () -> {};
}
} else {
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request),
force(request));
}
}

protected long primaryOperationSize(Request request) {
return 0;
}

protected int primaryOperationCount(Request request) {
return 0;
}

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
return indexingPressure.markReplicaOperationStarted(replicaOperationCount(request), replicaOperationSize(request), force(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
return 0;
}

protected int replicaOperationCount(ReplicaRequest request) {
return 0;
}

/** Syncs operation result to the translog or throws a shard not available failure */
protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Location currentLocation) throws Exception {
Expand Down Expand Up @@ -172,9 +182,11 @@ protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
@Override
protected void shardOperationOnPrimary(
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
final long enqueueTime = System.nanoTime();
threadPool.executor(executorFunction.apply(primary)).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
final long queuedNanos = System.nanoTime() - enqueueTime;
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
dispatchedShardOperationOnPrimary(request, primary, listener);
}

Expand All @@ -197,9 +209,11 @@ protected abstract void dispatchedShardOperationOnPrimary(
*/
@Override
protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
final long enqueueTime = System.nanoTime();
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
final long queuedNanos = System.nanoTime() - enqueueTime;
dispatchedShardOperationOnReplica(request, replica, listener);
}

Expand Down
55 changes: 32 additions & 23 deletions server/src/main/java/org/elasticsearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,19 @@ public class IndexingPressure {
private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentReplicaBytes = new AtomicLong(0);

private final AtomicLong currentCoordinatingOps = new AtomicLong(0);
private final AtomicLong currentPrimaryOps = new AtomicLong(0);
private final AtomicLong currentReplicaOps = new AtomicLong(0);

private final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalCoordinatingBytes = new AtomicLong(0);
private final AtomicLong totalPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalReplicaBytes = new AtomicLong(0);

private final AtomicLong totalCoordinatingOps = new AtomicLong(0);
private final AtomicLong totalPrimaryOps = new AtomicLong(0);
private final AtomicLong totalReplicaOps = new AtomicLong(0);

private final AtomicLong coordinatingRejections = new AtomicLong(0);
private final AtomicLong primaryRejections = new AtomicLong(0);
private final AtomicLong replicaRejections = new AtomicLong(0);
Expand All @@ -55,7 +63,7 @@ public IndexingPressure(Settings settings) {
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = combinedBytes + replicaWriteBytes;
Expand All @@ -72,21 +80,29 @@ public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExec
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
currentCoordinatingBytes.getAndAdd(bytes);
currentCoordinatingOps.getAndAdd(operations);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
totalCoordinatingOps.getAndAdd(operations);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentCoordinatingBytes.getAndAdd(-bytes);
this.currentCoordinatingOps.getAndAdd(-operations);
};
}

public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) {
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) {
currentPrimaryBytes.getAndAdd(bytes);
currentPrimaryOps.getAndAdd(operations);
totalPrimaryBytes.getAndAdd(bytes);
return () -> this.currentPrimaryBytes.getAndAdd(-bytes);
totalPrimaryOps.getAndAdd(operations);
return () -> {
this.currentPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryOps.getAndAdd(-operations);
};
}

public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) {
public Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = combinedBytes + replicaWriteBytes;
Expand All @@ -103,15 +119,18 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
currentPrimaryBytes.getAndAdd(bytes);
currentPrimaryOps.getAndAdd(operations);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
totalPrimaryOps.getAndAdd(operations);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryOps.getAndAdd(-operations);
};
}

public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
public Releasable markReplicaOperationStarted(int operations, long bytes, boolean forceExecution) {
long replicaWriteBytes = this.currentReplicaBytes.addAndGet(bytes);
if (forceExecution == false && replicaWriteBytes > replicaLimits) {
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
Expand All @@ -122,30 +141,20 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution
"replica_operation_bytes=" + bytes + ", " +
"max_replica_bytes=" + replicaLimits + "]", false);
}
currentReplicaOps.getAndAdd(operations);
totalReplicaBytes.getAndAdd(bytes);
return () -> this.currentReplicaBytes.getAndAdd(-bytes);
}

public long getCurrentCombinedCoordinatingAndPrimaryBytes() {
return currentCombinedCoordinatingAndPrimaryBytes.get();
}

public long getCurrentCoordinatingBytes() {
return currentCoordinatingBytes.get();
}

public long getCurrentPrimaryBytes() {
return currentPrimaryBytes.get();
}

public long getCurrentReplicaBytes() {
return currentReplicaBytes.get();
totalReplicaOps.getAndAdd(operations);
return () -> {
this.currentReplicaBytes.getAndAdd(-bytes);
this.currentReplicaOps.getAndAdd(-operations);
};
}

public IndexingPressureStats stats() {
return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(),
totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(),
currentCoordinatingBytes.get(), currentPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingRejections.get(),
primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits);
primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits, totalCoordinatingOps.get(),
totalPrimaryOps.get(), totalReplicaOps.get(), currentCoordinatingOps.get(), currentPrimaryOps.get(), currentReplicaOps.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
private final long replicaRejections;
private final long memoryLimit;

private final long totalCoordinatingOps;
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
private final long totalPrimaryOps;
private final long totalReplicaOps;
private final long currentCoordinatingOps;
private final long currentPrimaryOps;
private final long currentReplicaOps;

public IndexingPressureStats(StreamInput in) throws IOException {
totalCombinedCoordinatingAndPrimaryBytes = in.readVLong();
totalCoordinatingBytes = in.readVLong();
Expand All @@ -66,12 +73,21 @@ public IndexingPressureStats(StreamInput in) throws IOException {
} else {
memoryLimit = -1L;
}

// These are not currently propagated across the network yet
this.totalCoordinatingOps = 0;
this.totalPrimaryOps = 0;
this.totalReplicaOps = 0;
this.currentCoordinatingOps = 0;
this.currentPrimaryOps = 0;
this.currentReplicaOps = 0;
}

public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes,
long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes,
long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections,
long replicaRejections, long memoryLimit) {
long replicaRejections, long memoryLimit, long totalCoordinatingOps, long totalPrimaryOps,
long totalReplicaOps, long currentCoordinatingOps, long currentPrimaryOps, long currentReplicaOps) {
this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
this.totalCoordinatingBytes = totalCoordinatingBytes;
this.totalPrimaryBytes = totalPrimaryBytes;
Expand All @@ -84,6 +100,22 @@ public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long
this.primaryRejections = primaryRejections;
this.replicaRejections = replicaRejections;
this.memoryLimit = memoryLimit;

this.totalCoordinatingOps = totalCoordinatingOps;
this.totalPrimaryOps = totalPrimaryOps;
this.totalReplicaOps = totalReplicaOps;
this.currentCoordinatingOps = currentCoordinatingOps;
this.currentPrimaryOps = currentPrimaryOps;
this.currentReplicaOps = currentReplicaOps;
}

public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes,
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes,
long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections,
long replicaRejections, long memoryLimit) {
this(totalCombinedCoordinatingAndPrimaryBytes, totalCoordinatingBytes, totalPrimaryBytes, totalReplicaBytes,
currentCombinedCoordinatingAndPrimaryBytes, currentCoordinatingBytes, currentPrimaryBytes, currentReplicaBytes,
coordinatingRejections, primaryRejections, replicaRejections, memoryLimit, 0, 0, 0, 0, 0, 0);
}

@Override
Expand Down Expand Up @@ -151,6 +183,30 @@ public long getReplicaRejections() {
return replicaRejections;
}

public long getTotalCoordinatingOps() {
return totalCoordinatingOps;
}

public long getTotalPrimaryOps() {
return totalPrimaryOps;
}

public long getTotalReplicaOps() {
return totalReplicaOps;
}

public long getCurrentCoordinatingOps() {
return currentCoordinatingOps;
}

public long getCurrentPrimaryOps() {
return currentPrimaryOps;
}

public long getCurrentReplicaOps() {
return currentReplicaOps;
}

private static final String COMBINED = "combined_coordinating_and_primary";
private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
private static final String COORDINATING = "coordinating";
Expand Down
Loading