Skip to content

Commit

Permalink
Statically cache the most common instances of o.e.a.support.replicati…
Browse files Browse the repository at this point in the history
…on.ReplicationResponse.ShardInfo (#104571)

Interestingly enough, these instances account for more than 1% of
allocations even when there's no failures during the http_logs benchmark
indexing step (on coordinating/data nodes). => I think it's worthwhile
to make these immutable and precompute the most common ones (up to 9
replicas is probably overkill but these are cheap enough for it to be
irrelevant).
  • Loading branch information
original-brownbear authored Jan 22, 2024
1 parent e8370f8 commit 4076aa7
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void markOperationAsExecuted(Engine.Result result) {
}
executionResult = BulkItemResponse.success(current.id(), current.request().opType(), response);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
executionResult.getResponse().setShardInfo(ReplicationResponse.ShardInfo.EMPTY);
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
}
case FAILURE -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -36,7 +35,7 @@ public SimulateIndexResponse(StreamInput in) throws IOException {
super(in);
this.source = in.readBytesReference();
this.sourceXContentType = XContentType.valueOf(in.readString());
setShardInfo(new ReplicationResponse.ShardInfo(0, 0));
setShardInfo(ShardInfo.EMPTY);
}

@SuppressWarnings("this-escape")
Expand All @@ -52,7 +51,7 @@ public SimulateIndexResponse(
super(new ShardId(index, "", 0), id == null ? "<n/a>" : id, 0, 0, version, true, pipelines);
this.source = source;
this.sourceXContentType = sourceXContentType;
setShardInfo(new ReplicationResponse.ShardInfo(0, 0));
setShardInfo(ShardInfo.EMPTY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,13 @@ private void decPendingAndFinishIfNeeded() {

private void finish() {
if (finished.compareAndSet(false, true)) {
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
if (shardReplicaFailures.isEmpty()) {
failuresArray = ReplicationResponse.NO_FAILURES;
} else {
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
shardReplicaFailures.toArray(failuresArray);
}
primaryResult.setShardInfo(new ReplicationResponse.ShardInfo(totalShards.get(), successfulShards.get(), failuresArray));
primaryResult.setShardInfo(
ReplicationResponse.ShardInfo.of(
totalShards.get(),
successfulShards.get(),
shardReplicaFailures.toArray(ReplicationResponse.NO_FAILURES)
)
);
resultListener.onResponse(primaryResult);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand All @@ -43,7 +44,7 @@ public ReplicationResponse() {}

public ReplicationResponse(StreamInput in) throws IOException {
super(in);
shardInfo = new ReplicationResponse.ShardInfo(in);
shardInfo = ReplicationResponse.ShardInfo.readFrom(in);
}

@Override
Expand All @@ -61,30 +62,61 @@ public void setShardInfo(ShardInfo shardInfo) {

public static class ShardInfo implements Writeable, ToXContentObject {

// cache the most commonly used instances where all shard operations succeeded to save allocations on the transport layer
private static final ShardInfo[] COMMON_INSTANCES = IntStream.range(0, 10)
.mapToObj(i -> new ShardInfo(i, i, NO_FAILURES))
.toArray(ShardInfo[]::new);

public static final ShardInfo EMPTY = COMMON_INSTANCES[0];

private static final String TOTAL = "total";
private static final String SUCCESSFUL = "successful";
private static final String FAILED = "failed";
private static final String FAILURES = "failures";

private int total;
private int successful;
private Failure[] failures = ReplicationResponse.NO_FAILURES;
private final int total;
private final int successful;
private final Failure[] failures;

public ShardInfo() {}

public ShardInfo(StreamInput in) throws IOException {
total = in.readVInt();
successful = in.readVInt();
public static ShardInfo readFrom(StreamInput in) throws IOException {
int total = in.readVInt();
int successful = in.readVInt();
int size = in.readVInt();

final Failure[] failures;
if (size > 0) {
failures = new Failure[size];
for (int i = 0; i < size; i++) {
failures[i] = new Failure(in);
}
} else {
failures = NO_FAILURES;
}
return ShardInfo.of(total, successful, failures);
}

public static ShardInfo allSuccessful(int total) {
if (total < COMMON_INSTANCES.length) {
return COMMON_INSTANCES[total];
}
return new ShardInfo(total, total, NO_FAILURES);
}

public static ShardInfo of(int total, int successful) {
if (total == successful) {
return allSuccessful(total);
}
return new ShardInfo(total, successful, ReplicationResponse.NO_FAILURES);
}

public static ShardInfo of(int total, int successful, Failure[] failures) {
if (failures.length == 0) {
return of(total, successful);
}
return new ShardInfo(total, successful, failures);
}

public ShardInfo(int total, int successful, Failure... failures) {
private ShardInfo(int total, int successful, Failure[] failures) {
assert total >= 0 && successful >= 0;
this.total = total;
this.successful = successful;
Expand Down Expand Up @@ -188,7 +220,7 @@ public static ShardInfo fromXContent(XContentParser parser) throws IOException {
}
Failure[] failures = ReplicationResponse.NO_FAILURES;
if (failuresList != null) {
failures = failuresList.toArray(new Failure[failuresList.size()]);
failures = failuresList.toArray(ReplicationResponse.NO_FAILURES);
}
return new ShardInfo(total, successful, failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public UpdateResponse(StreamInput in) throws IOException {
* For example: update script with operation set to none
*/
public UpdateResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
this(new ShardInfo(0, 0), shardId, id, seqNo, primaryTerm, version, result);
this(ShardInfo.EMPTY, shardId, id, seqNo, primaryTerm, version, result);
}

@SuppressWarnings("this-escape")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ private void sendVerifyShardBeforeCloseRequest(
if (shardRoutingTable.primaryShard().unassigned()) {
logger.debug("primary shard {} is unassigned, ignoring", shardId);
final ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size()));
response.setShardInfo(ReplicationResponse.ShardInfo.allSuccessful(shardRoutingTable.size()));
listener.onResponse(response);
return;
}
Expand Down Expand Up @@ -786,7 +786,7 @@ private void sendVerifyShardBlockRequest(
if (shardRoutingTable.primaryShard().unassigned()) {
logger.debug("primary shard {} is unassigned, ignoring", shardId);
final ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size()));
response.setShardInfo(ReplicationResponse.ShardInfo.allSuccessful(shardRoutingTable.size()));
listener.onResponse(response);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testToXContentDoesntIncludeForcedRefreshUnlessForced() throws IOExce
) {
// DocWriteResponse is abstract so we have to sneak a subclass in here to test it.
};
response.setShardInfo(new ShardInfo(1, 1));
response.setShardInfo(ShardInfo.allSuccessful(1));
response.setForcedRefresh(false);
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testToXContent() throws IOException {
{
DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "id", -1, 0, 7, true);
response.setForcedRefresh(true);
response.setShardInfo(new ReplicationResponse.ShardInfo(10, 5));
response.setShardInfo(ReplicationResponse.ShardInfo.of(10, 5));
String output = Strings.toString(response);
assertEquals(XContentHelper.stripWhitespace("""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testIndexResponse() {
IndexResponse indexResponse = new IndexResponse(shardId, id, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, version, created);
int total = randomIntBetween(1, 10);
int successful = randomIntBetween(1, 10);
ReplicationResponse.ShardInfo shardInfo = new ReplicationResponse.ShardInfo(total, successful);
ReplicationResponse.ShardInfo shardInfo = ReplicationResponse.ShardInfo.of(total, successful);
indexResponse.setShardInfo(shardInfo);
boolean forcedRefresh = false;
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testToXContent() throws IOException {
{
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "id", -1, 17, 7, true);
indexResponse.setForcedRefresh(true);
indexResponse.setShardInfo(new ReplicationResponse.ShardInfo(10, 5));
indexResponse.setShardInfo(ReplicationResponse.ShardInfo.of(10, 5));
String output = Strings.toString(indexResponse);
assertEquals(XContentHelper.stripWhitespace("""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testStartedPrimary() throws InterruptedException, ExecutionException
ActionTestUtils.execute(broadcastReplicationAction, null, new DummyBroadcastRequest(index), response);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ReplicationResponse replicationResponse = new ReplicationResponse();
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1));
replicationResponse.setShardInfo(ReplicationResponse.ShardInfo.allSuccessful(1));
shardRequests.v2().onResponse(replicationResponse);
}
logger.info("total shards: {}, ", response.get().getTotalShards());
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testResultCombine() throws InterruptedException, ExecutionException,
);
failed++;
}
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures));
replicationResponse.setShardInfo(ReplicationResponse.ShardInfo.of(2, shardsSucceeded, failures));
shardRequests.v2().onResponse(replicationResponse);
} else {
// sometimes fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,35 @@ public class ReplicationResponseTests extends ESTestCase {
public void testShardInfoToString() {
final int total = 5;
final int successful = randomIntBetween(1, total);
final ShardInfo shardInfo = new ShardInfo(total, successful);
final ShardInfo shardInfo = ShardInfo.of(total, successful);
assertEquals(Strings.format("ShardInfo{total=5, successful=%d, failures=[]}", successful), shardInfo.toString());
}

public void testShardInfoToXContent() throws IOException {
{
ShardInfo shardInfo = new ShardInfo(5, 3);
ShardInfo shardInfo = ShardInfo.of(5, 3);
String output = Strings.toString(shardInfo);
assertEquals("{\"total\":5,\"successful\":3,\"failed\":0}", output);
}
{
ShardInfo shardInfo = new ShardInfo(
ShardInfo shardInfo = ShardInfo.of(
6,
4,
new ShardInfo.Failure(
new ShardId("index", "_uuid", 3),
"_node_id",
new IllegalArgumentException("Wrong"),
RestStatus.BAD_REQUEST,
false
),
new ShardInfo.Failure(
new ShardId("index", "_uuid", 1),
"_node_id",
new CircuitBreakingException("Wrong", 12, 21, CircuitBreaker.Durability.PERMANENT),
RestStatus.NOT_ACCEPTABLE,
true
)
new ShardInfo.Failure[] {
new ShardInfo.Failure(
new ShardId("index", "_uuid", 3),
"_node_id",
new IllegalArgumentException("Wrong"),
RestStatus.BAD_REQUEST,
false
),
new ShardInfo.Failure(
new ShardId("index", "_uuid", 1),
"_node_id",
new CircuitBreakingException("Wrong", 12, 21, CircuitBreaker.Durability.PERMANENT),
RestStatus.NOT_ACCEPTABLE,
true
) }
);
String output = Strings.toString(shardInfo);
assertEquals(XContentHelper.stripWhitespace("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ static class TestResponse extends ReplicationResponse {
}

TestResponse() {
setShardInfo(new ShardInfo());
setShardInfo(ReplicationResponse.ShardInfo.EMPTY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testToXContent() throws IOException {
}
{
UpdateResponse updateResponse = new UpdateResponse(
new ReplicationResponse.ShardInfo(10, 6),
ReplicationResponse.ShardInfo.of(10, 6),
new ShardId("index", "index_uuid", 1),
"id",
3,
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testToXContent() throws IOException {
fields.put("isbn", new DocumentField("isbn", Collections.singletonList("ABC-123")));

UpdateResponse updateResponse = new UpdateResponse(
new ReplicationResponse.ShardInfo(3, 2),
ReplicationResponse.ShardInfo.of(3, 2),
new ShardId("books", "books_uuid", 2),
"1",
7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public static Tuple<ShardInfo, ShardInfo> randomShardInfo(Random random) {
public static Tuple<ShardInfo, ShardInfo> randomShardInfo(Random random, boolean withShardFailures) {
int total = randomIntBetween(random, 1, 10);
if (withShardFailures == false) {
return Tuple.tuple(new ShardInfo(total, total), new ShardInfo(total, total));
return Tuple.tuple(ShardInfo.allSuccessful(total), ShardInfo.allSuccessful(total));
}

int successful = randomIntBetween(random, 1, Math.max(1, (total - 1)));
Expand All @@ -260,7 +260,7 @@ public static Tuple<ShardInfo, ShardInfo> randomShardInfo(Random random, boolean
actualFailures[i] = failure.v1();
expectedFailures[i] = failure.v2();
}
return Tuple.tuple(new ShardInfo(total, successful, actualFailures), new ShardInfo(total, successful, expectedFailures));
return Tuple.tuple(ShardInfo.of(total, successful, actualFailures), ShardInfo.of(total, successful, expectedFailures));
}

/**
Expand Down

0 comments on commit 4076aa7

Please sign in to comment.