Skip to content

Commit

Permalink
Closes #4529
Browse files Browse the repository at this point in the history
- Reference the ElasticSearch documentation in the bulk chunker size limit config option
- Cleaning up the population of the ES bulk request path logic
- Submitting bulk request items that are below the configured limit, then throwing for overly large items

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis authored and li-boxuan committed Jun 28, 2024
1 parent d255396 commit a3393a1
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);
"chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the " +
"smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured " +
"limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
"[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.es.rest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

private class RequestBytes {
@VisibleForTesting
class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
@VisibleForTesting
RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE
}
}

private int getSerializedSize() {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
Expand All @@ -445,15 +450,15 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
final StringBuilder bulkRequestQueryParameters = new StringBuilder();
if (ingestPipeline != null) {
APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline);
APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
}
if (bulkRefreshEnabled) {
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand All @@ -476,7 +481,8 @@ private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMuta
return errors;
}

private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
@VisibleForTesting
class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

Expand All @@ -485,18 +491,33 @@ private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
//Only keep items that we can actually send in memory
serializedRequests.add(requestBytes);
} else {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
Expand All @@ -505,20 +526,21 @@ public List<RequestBytes> next() {
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
int requestSerializedSize = peeked.getSerializedSize();
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
chunkSerializedTotal += peeked.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//Check if we should throw an exception for items that were exceptionally large and therefore undeliverable.
//This is only done after all items that could be sent have been sent
if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests != null) {
throw new IllegalArgumentException(String.format(
"Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s",
bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests)));
}
//All remaining requests fit in this chunk
return serializedRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException {
}
}

@Test
public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException {
int bulkLimit = 1_000_000;
StringBuilder overlyLargePayloadBuilder = new StringBuilder();
IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a"));
String overlyLargePayload = overlyLargePayloadBuilder.toString();
ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2",
Collections.singletonMap("someKey", overlyLargePayload));
List<ElasticSearchMutation> bulkItems = Arrays.asList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1",
Collections.singletonMap("someKey", "small_payload1")),
overlyLargeMutation,
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3",
Collections.singletonMap("someKey", "small_payload2"))
);

try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) {
RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems);
int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize();

//The chunker should chunk this request first as a list of the 2 smaller items
List<RestElasticSearchClient.RequestBytes> smallItemsChunk = chunkerUnderTest.next();
Assertions.assertEquals(2, smallItemsChunk.size());

//Then the chunker should still return true for hasNext()
Assertions.assertTrue(chunkerUnderTest.hasNext());

//Then the next call for next() should throw to report the exceptionally large item
IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next,
"Should have thrown due to bulk request item being too large");

String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]",
bulkLimit, overlyLargeRequestExpectedSize);

Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage());
}
}

@Test
public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
int bulkLimit = 800;
Expand Down

1 comment on commit a3393a1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: a3393a1 Previous: d35fc64 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12187.993678628889 ms/op 12468.444225291154 ms/op 0.98
org.janusgraph.GraphCentricQueryBenchmark.getVertices 874.6122588948206 ms/op 918.3423996712321 ms/op 0.95
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.03713029746376 ms/op 216.65935547028985 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 339.0628359951923 ms/op 336.6219472083333 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 186.0506224618164 ms/op 212.17318113485976 ms/op 0.88
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4767.89194829091 ms/op 4934.862177070978 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16244.675173076179 ms/op 17119.84681319412 ms/op 0.95
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 19275.495860666753 ms/op 20987.51313056889 ms/op 0.92
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 55078.1050332 ms/op 57050.64306146667 ms/op 0.97
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1458.4466112339442 ms/op 1559.969298241005 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 7569.1019352868725 ms/op 8318.72392928847 ms/op 0.91
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 368.0279239712603 ms/op 392.59846986303853 ms/op 0.94
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4055.3720129211715 ms/op 4240.712364445486 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNames 7849.213062288612 ms/op 8406.331584402715 ms/op 0.93
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5343.356895944186 ms/op 5987.64859878139 ms/op 0.89
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6775.682161018402 ms/op 7290.028061798635 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 425.06636857733736 ms/op 435.76237709334333 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12181.232973299086 ms/op 12783.126217693469 ms/op 0.95
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 347.3594025405252 ms/op 363.9668594437082 ms/op 0.95
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14285.030396524864 ms/op 15378.720663330263 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 243.56504596004459 ms/op 253.97183167148802 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14090.78011571159 ms/op 14617.308247215036 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 7893.91706266809 ms/op 8414.490379754512 ms/op 0.94
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 8570.478228808666 ms/op 9340.049792997386 ms/op 0.92
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8104.956886639752 ms/op 8726.780923948296 ms/op 0.93

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.