Skip to content

Commit

Permalink
Handle missing locations
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmitterdorfer committed Oct 26, 2023
1 parent 6887607 commit b60b0b9
Showing 1 changed file with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import static org.elasticsearch.core.Strings.format;

public class TransportShardMultiGetAction extends TransportSingleShardAction<MultiGetShardRequest, MultiGetShardResponse> {
private static final boolean USE_BATCHED_GET = Booleans.parseBoolean(System.getProperty("es.mget.batched", "false"));
private static final boolean USE_BATCHED_GET = Booleans.parseBoolean(System.getProperty("es.mget.batched", "false"));

private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]";
public static final ActionType<MultiGetShardResponse> TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new);
Expand Down Expand Up @@ -145,7 +145,7 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
@Override
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
MultiGetShardResponse response = new MultiGetShardResponse();
getAndAddToResponse(request, response, shardId);
getAndAddToResponse(request, response, shardId, false);
return response;
}

Expand Down Expand Up @@ -229,27 +229,37 @@ private void handleMultiGetOnUnpromotableShard(

private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
logger.trace("handling local gets for missing locations");
getAndAddToResponse(request, response, shardId);
getAndAddToResponse(request, response, shardId, true);
return response;
}

private void getAndAddToResponse(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
private void getAndAddToResponse(
MultiGetShardRequest request,
MultiGetShardResponse response,
ShardId shardId,
boolean expectMissingLocations
) {
long start = System.nanoTime();
if (USE_BATCHED_GET) {
getAndAddToResponseBatched(request, response, shardId);
getAndAddToResponseBatched(request, response, shardId, expectMissingLocations);
logger.info("Batched mget took=[" + (System.nanoTime() - start) / 1_000_000 + "] ms.");
} else {
getAndAddToResponseSingle(request, response, shardId);
getAndAddToResponseSingle(request, response, shardId, expectMissingLocations);
logger.info("Single mget took=[" + (System.nanoTime() - start) / 1_000_000 + "] ms.");
}
}

private void getAndAddToResponseSingle(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
private void getAndAddToResponseSingle(
MultiGetShardRequest request,
MultiGetShardResponse response,
ShardId shardId,
boolean expectMissingLocations
) {
var indexShard = getIndexShard(shardId);
final int numRequests = request.locations.size();
final GetAndFetchContext[] getAndFetchContexts = new GetAndFetchContext[numRequests];
for (int location = 0; location < request.locations.size(); location++) {
if (response.responses.get(location) == null && response.failures.get(location) == null) {
if (expectMissingLocations == false || (response.responses.get(location) == null && response.failures.get(location) == null)) {
MultiGetRequest.Item item = request.items.get(location);
try {
GetResult getResult = indexShard.getService()
Expand Down Expand Up @@ -277,12 +287,18 @@ private void getAndAddToResponseSingle(MultiGetShardRequest request, MultiGetSha
}
}
}
private void getAndAddToResponseBatched(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {

private void getAndAddToResponseBatched(
MultiGetShardRequest request,
MultiGetShardResponse response,
ShardId shardId,
boolean expectMissingLocations
) {
var indexShard = getIndexShard(shardId);
final int numRequests = request.locations.size();
final GetAndFetchContext[] getAndFetchContexts = new GetAndFetchContext[numRequests];
for (int location = 0; location < request.locations.size(); location++) {
if (response.responses.get(location) == null && response.failures.get(location) == null) {
if (expectMissingLocations == false || (response.responses.get(location) == null && response.failures.get(location) == null)) {
MultiGetRequest.Item item = request.items.get(location);
// try {
/*
Expand Down

0 comments on commit b60b0b9

Please sign in to comment.