Skip to content

Commit

Permalink
Only retry bulk request items that failed
Browse files Browse the repository at this point in the history
  • Loading branch information
criminosis committed May 30, 2024
1 parent 393b432 commit 5c2b132
Showing 1 changed file with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@
import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings;
import org.janusgraph.diskstorage.es.script.ESScriptResponse;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -380,8 +383,7 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
private Pair<String, byte[]> buildBulkRequestInput(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final ElasticSearchMutation request : requests) {
Map<String, Object> requestData = new HashMap<>();
Expand Down Expand Up @@ -416,24 +418,49 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
}

private List<Triplet<Object, Integer, ElasticSearchMutation>> pairErrorsWithSubmittedMutation(
//Bulk API is documented to return bulk item responses in the same order of submission
//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
//As such we only need to retry elements that failed
final List<Map<String, RestBulkResponse.RestBulkItemResponse>> bulkResponseItems,
final List<ElasticSearchMutation> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, ElasticSearchMutation>> errors = new ArrayList<>(bulkResponseItems.size());
for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) {
Collection<RestBulkResponse.RestBulkItemResponse> bulkResponseItem = bulkResponseItems.get(itemIndex).values();
if (bulkResponseItem.size() > 1) {
throw new IllegalStateException("There should only be a single item per bulk reponse item entry");
}
RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next();
if (item.getError() != null && item.getStatus() != 404) {
errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex)));
}
}
return errors;
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
List<ElasticSearchMutation> requestsToSend = requests;
int retryCount = 0;
while (true) {
final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(requestsToSend, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Pair<Object, Integer>> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(item -> Pair.with(item.getError(), item.getStatus())).collect(Collectors.toList());
if (!errors.isEmpty()) {
List<Triplet<Object, Integer, ElasticSearchMutation>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), requestsToSend);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = errors.stream().map(Pair::getValue1).collect(Collectors.toSet());
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
requestsToSend = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
} else {
final List<Object> errorItems = errors.stream().map(Pair::getValue0).collect(Collectors.toList());
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
}
Expand Down

1 comment on commit 5c2b132

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5c2b132 Previous: 487e10c Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 13064.487101517389 ms/op 12689.439979944173 ms/op 1.03
org.janusgraph.GraphCentricQueryBenchmark.getVertices 917.1542229838899 ms/op 925.013828296019 ms/op 0.99
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.40419136630436 ms/op 216.46225680797102 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 341.24780739464285 ms/op 340.16287398523804 ms/op 1.00
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 256.7825195367334 ms/op 217.50069746084958 ms/op 1.18
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5006.825848074439 ms/op 5053.275978996042 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 17402.067540816788 ms/op 16275.762675504468 ms/op 1.07
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 20367.73059507818 ms/op 19161.81803811333 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 56536.5839815 ms/op 54807.3874225 ms/op 1.03
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1601.5092235715454 ms/op 1525.6592376795456 ms/op 1.05
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8383.487556671309 ms/op 8025.759033073809 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 384.9492693973299 ms/op 376.13373544806296 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4166.5354476095035 ms/op 4086.6631890165872 ms/op 1.02
org.janusgraph.CQLMultiQueryBenchmark.getNames 8476.082331762345 ms/op 8416.939881828739 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5847.76823022735 ms/op 5611.851519805606 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getLabels 7476.94776189832 ms/op 6828.816333819391 ms/op 1.09
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 441.04533640656797 ms/op 434.35062048578123 ms/op 1.02
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12656.998816571519 ms/op 11980.571977528334 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 368.40111770067546 ms/op 355.802058751578 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14690.792975110715 ms/op 14729.77276273078 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 250.6826633725123 ms/op 243.61035626593375 ms/op 1.03
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 15589.637740560207 ms/op 13656.572724210222 ms/op 1.14
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8673.174343363138 ms/op 8290.3478513332 ms/op 1.05
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9349.298626702563 ms/op 9070.803732347935 ms/op 1.03
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8916.81141455257 ms/op 8637.306367440338 ms/op 1.03

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.