From 1765a7db94034420237600473018662d66b4abf7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 23 Sep 2024 13:25:43 -0600 Subject: [PATCH] Default incremental bulk functionality to false This commit flips the incremental bulk setting to false. Additionally, it removes some test code which intermittently causes issues with security test cases. --- .../http/IncrementalBulkRestIT.java | 8 +++ .../action/bulk/IncrementalBulkService.java | 2 +- .../elasticsearch/test/ESIntegTestCase.java | 49 ++----------------- 3 files changed, 13 insertions(+), 46 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 2b24e53874e51..da05011696274 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -29,6 +29,14 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true) + .build(); + } + public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException { Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty"); Response response = getRestClient().performRequest(request); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 7185c4d76265e..fc264de35f510 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -36,7 +36,7 @@ public class IncrementalBulkService { public static final Setting INCREMENTAL_BULK = boolSetting( "rest.incremental_bulk", - true, + false, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 92e480aff3bc9..cca3443c28e3a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; @@ -49,8 +48,6 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.bulk.IncrementalBulkService; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineTransportAction; @@ -196,7 +193,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -1782,48 +1778,11 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size()); for (List segmented : partition) { BulkResponse actionGet; - if (randomBoolean()) { - BulkRequestBuilder bulkBuilder = client().prepareBulk(); - for (IndexRequestBuilder indexRequestBuilder : segmented) { - bulkBuilder.add(indexRequestBuilder); - } - actionGet = bulkBuilder.get(); - } else { - IncrementalBulkService bulkService = internalCluster().getInstance(IncrementalBulkService.class); - IncrementalBulkService.Handler handler = bulkService.newBulkRequest(); - - ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - segmented.forEach(b -> queue.add(b.request())); - - PlainActionFuture future = new PlainActionFuture<>(); - AtomicInteger runs = new AtomicInteger(0); - Runnable r = new Runnable() { - - @Override - public void run() { - int toRemove = Math.min(randomIntBetween(5, 10), queue.size()); - ArrayList> docs = new ArrayList<>(); - for (int i = 0; i < toRemove; i++) { - docs.add(queue.poll()); - } - - if (queue.isEmpty()) { - handler.lastItems(docs, () -> {}, future); - } else { - handler.addItems(docs, () -> {}, () -> { - // Every 10 runs dispatch to new thread to prevent stackoverflow - if (runs.incrementAndGet() % 10 == 0) { - new Thread(this).start(); - } else { - this.run(); - } - }); - } - } - }; - r.run(); - actionGet = future.actionGet(); + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + for (IndexRequestBuilder indexRequestBuilder : segmented) { + bulkBuilder.add(indexRequestBuilder); } + actionGet = bulkBuilder.get(); assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false)); } }