diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index f5fd722f77..e8d07a7043 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -150,7 +150,7 @@ Elasticsearch index configuration | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | -| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL | +| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL | | index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE | | index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE | | index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index e357fd20ad..4e3e5a0ff8 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -327,7 +327,11 @@ public class ElasticSearchIndex implements IndexProvider { public static final ConfigOption BULK_CHUNK_SIZE_LIMIT_BYTES = new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + - "chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); + "chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the " + + "smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured " + + "limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the " + + "[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).", + ConfigOption.Type.LOCAL, Integer.class, 100_000_000); public static final int HOST_PORT_DEFAULT = 9200; diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 59fa0af314..51adc5eb8b 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -14,6 +14,7 @@ package org.janusgraph.diskstorage.es.rest; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; @@ -54,6 +55,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException { } } - private class RequestBytes { + @VisibleForTesting + class RequestBytes { final byte [] requestBytes; final byte [] requestSource; - private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { + @VisibleForTesting + RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { Map requestData = new HashMap<>(); if (useMappingTypes) { requestData.put("_index", request.getIndex()); @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE } } - private int getSerializedSize() { + @VisibleForTesting + int getSerializedSize() { int serializedSize = this.requestBytes.length; serializedSize+= 1; //For follow-up NEW_LINE_BYTES if (this.requestSource != null) { @@ -445,15 +450,15 @@ private Pair buildBulkRequestInput(List requests, request.writeTo(outputStream); } - final StringBuilder builder = new StringBuilder(); + final StringBuilder bulkRequestQueryParameters = new StringBuilder(); if (ingestPipeline != null) { - APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline); + APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline); } if (bulkRefreshEnabled) { - APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh); + APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh); } - builder.insert(0, REQUEST_SEPARATOR + "_bulk"); - return Pair.with(builder.toString(), outputStream.toByteArray()); + final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters; + return Pair.with(bulkRequestPath, outputStream.toByteArray()); } private List> pairErrorsWithSubmittedMutation( @@ -476,7 +481,8 @@ private List> pairErrorsWithSubmittedMuta return errors; } - private class BulkRequestChunker implements Iterator> { + @VisibleForTesting + class BulkRequestChunker implements Iterator> { //By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under //the specified limit @@ -485,18 +491,33 @@ private class BulkRequestChunker implements Iterator> { // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum // size of a HTTP request to 100mb by default private final PeekingIterator requestIterator; + private final int[] exceptionallyLargeRequests; - private BulkRequestChunker(List requests) throws JsonProcessingException { + @VisibleForTesting + BulkRequestChunker(List requests) throws JsonProcessingException { List serializedRequests = new ArrayList<>(requests.size()); + List requestSizesThatWereTooLarge = new ArrayList<>(); for (ElasticSearchMutation request : requests) { - serializedRequests.add(new RequestBytes(request)); + RequestBytes requestBytes = new RequestBytes(request); + int requestSerializedSize = requestBytes.getSerializedSize(); + if (requestSerializedSize <= bulkChunkSerializedLimitBytes) { + //Only keep items that we can actually send in memory + serializedRequests.add(requestBytes); + } else { + requestSizesThatWereTooLarge.add(requestSerializedSize); + } } this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + //Condense request sizes that are too large into an int array to remove Boxed & List memory overhead + this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null : + requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray(); } @Override public boolean hasNext() { - return requestIterator.hasNext(); + //Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted + //This allows next() to throw after all well sized requests have been chunked for submission + return requestIterator.hasNext() || exceptionallyLargeRequests != null; } @Override @@ -505,20 +526,21 @@ public List next() { int chunkSerializedTotal = 0; while (requestIterator.hasNext()) { RequestBytes peeked = requestIterator.peek(); - int requestSerializedSize = peeked.getSerializedSize(); - if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { - chunkSerializedTotal += requestSerializedSize; + chunkSerializedTotal += peeked.getSerializedSize(); + if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { serializedRequests.add(requestIterator.next()); - } else if (requestSerializedSize > bulkChunkSerializedLimitBytes) { - //we've encountered an element we cannot send to Elasticsearch given the configured limit - throw new IllegalArgumentException(String.format( - "Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s", - bulkChunkSerializedLimitBytes, requestSerializedSize)); } else { //Adding this element would exceed the limit, so return the chunk return serializedRequests; } } + //Check if we should throw an exception for items that were exceptionally large and therefore undeliverable. + //This is only done after all items that could be sent have been sent + if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests != null) { + throw new IllegalArgumentException(String.format( + "Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s", + bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests))); + } //All remaining requests fit in this chunk return serializedRequests; } diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java index 87f4cc1c3f..dae07870d5 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.stream.IntStream; import static org.mockito.ArgumentMatchers.any; @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException { } } + @Test + public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException { + int bulkLimit = 1_000_000; + StringBuilder overlyLargePayloadBuilder = new StringBuilder(); + IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a")); + String overlyLargePayload = overlyLargePayloadBuilder.toString(); + ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2", + Collections.singletonMap("someKey", overlyLargePayload)); + List bulkItems = Arrays.asList( + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1", + Collections.singletonMap("someKey", "small_payload1")), + overlyLargeMutation, + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3", + Collections.singletonMap("someKey", "small_payload2")) + ); + + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems); + int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize(); + + //The chunker should chunk this request first as a list of the 2 smaller items + List smallItemsChunk = chunkerUnderTest.next(); + Assertions.assertEquals(2, smallItemsChunk.size()); + + //Then the chunker should still return true for hasNext() + Assertions.assertTrue(chunkerUnderTest.hasNext()); + + //Then the next call for next() should throw to report the exceptionally large item + IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next, + "Should have thrown due to bulk request item being too large"); + + String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]", + bulkLimit, overlyLargeRequestExpectedSize); + + Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage()); + } + } + @Test public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException { int bulkLimit = 800;