From 5a02d1a75eec12c8bc783697e7638d977111d841 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Feb 2020 10:49:26 -0700 Subject: [PATCH 1/3] Force execution of finish shard bulk request Currently the shard bulk request can be rejected by the write threadpool after a mapping update. This introduces a scenario where the mapping listener thread will attempt to finish the request and fsync. This thread can potentially be a transport thread. This commit fixes this issue by forcing the finish action to happen on the write threadpool. Fixes #51904. --- .../action/bulk/TransportShardBulkAction.java | 15 ++- .../bulk/TransportShardBulkActionTests.java | 105 ++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index e709c24bc64c7..46284c8ec419e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -179,7 +179,20 @@ public void onRejection(Exception e) { e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()), context, null); } - finishRequest(); + + // Force the execution to finish the request + executor.execute(new ActionRunnable<>(listener) { + + @Override + protected void doRun() { + finishRequest(); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); } private void finishRequest() { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5985d6731fdbd..a6f6f3c5ca81e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -53,13 +54,18 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.arrayWithSize; @@ -809,6 +815,105 @@ public void testRetries() throws Exception { latch.await(); } + public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { + TestThreadPool rejectingThreadPool = new TestThreadPool( + "TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate", + Settings.builder() + .put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1) + .put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1) + .build()); + CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + cyclicBarrier.await(); + logger.info("blocking the write executor"); + cyclicBarrier.await(); + logger.info("unblocked the write executor"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + try { + cyclicBarrier.await(); + // Place a task in the queue to block next enqueue + rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {}); + + BulkItemRequest[] items = new BulkItemRequest[2]; + DocWriteRequest writeRequest1 = new IndexRequest("index").id("id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", 1); + DocWriteRequest writeRequest2 = new IndexRequest("index").id("id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + items[0] = new BulkItemRequest(0, writeRequest1); + items[1] = new BulkItemRequest(1, writeRequest2); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation1 = new Translog.Location(42, 36, 36); + Translog.Location resultLocation2 = new Translog.Location(42, 42, 42); + Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1); + Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2); + + IndexShard shard = mock(IndexShard.class); + when(shard.shardId()).thenReturn(shardId); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(success1, mappingUpdate, success2); + when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod(); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); + + randomlySetIgnoredPrimaryResponse(items[0]); + + AtomicInteger updateCalled = new AtomicInteger(); + + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, listener) -> { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + listener.onResponse(null); + try { + // Release blocking task now that the continue write execution has been rejected and + // the finishRequest execution has been force enqueued + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new IllegalStateException(e); + } + }, listener -> listener.onResponse(null), new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener(result -> + // Assert that we still need to fsync the location that was successfully written + assertThat(((WritePrimaryResult) result).location, + equalTo(resultLocation1))), latch), + rejectingThreadPool); + latch.await(); + + assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + + BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse1.getItemId(), equalTo(0)); + assertThat(primaryResponse1.getId(), equalTo("id")); + assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertFalse(primaryResponse1.isFailed()); + assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED)); + assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L)); + + BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse(); + assertThat(primaryResponse2.getItemId(), equalTo(1)); + assertThat(primaryResponse2.getId(), equalTo("id")); + assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertTrue(primaryResponse2.isFailed()); + assertNull(primaryResponse2.getResponse()); + assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS); + assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class)); + + closeShards(shard); + } finally { + rejectingThreadPool.shutdownNow(); + } + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary. From 4b0b0ebb8c254d37ba1cf351c46a706750c8ce54 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Feb 2020 12:13:44 -0700 Subject: [PATCH 2/3] Dispatch everything --- .../action/bulk/TransportShardBulkAction.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 46284c8ec419e..62f1635fc0085 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -170,21 +170,20 @@ protected void doRun() throws Exception { @Override public void onRejection(Exception e) { - // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request - while (context.hasMoreOperationsToExecute()) { - context.setRequestToExecute(context.getCurrent()); - final DocWriteRequest docWriteRequest = context.getRequestToExecute(); - onComplete( - exceptionToResult( - e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()), - context, null); - } - // Force the execution to finish the request executor.execute(new ActionRunnable<>(listener) { @Override protected void doRun() { + // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request + while (context.hasMoreOperationsToExecute()) { + context.setRequestToExecute(context.getCurrent()); + final DocWriteRequest docWriteRequest = context.getRequestToExecute(); + onComplete( + exceptionToResult( + e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()), + context, null); + } finishRequest(); } From ae403e787122a040e3dce40ad3cbc39ed526e729 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 11 Feb 2020 09:54:09 -0700 Subject: [PATCH 3/3] Comment --- .../elasticsearch/action/bulk/TransportShardBulkAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 62f1635fc0085..d3b7174b3f1e0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -170,7 +170,8 @@ protected void doRun() throws Exception { @Override public void onRejection(Exception e) { - // Force the execution to finish the request + // We must finish the outstanding request. Finishing the outstanding request can include + //refreshing and fsyncing. Therefore, we must force execution on the WRITE thread. executor.execute(new ActionRunnable<>(listener) { @Override