diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index b08c93a4dc240..1a391a05add58 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -137,7 +137,10 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); currentRequestStream = null; } else { - var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); + var contentStream = new Netty4HttpRequestBodyStream( + ctx.channel(), + serverTransport.getThreadPool().getThreadContext() + ); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 9a0dc09b7566c..238faa7a9237e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -16,6 +16,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -34,14 +35,18 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); + private final ThreadContext threadContext; private ByteBuf buf; private boolean hasLast = false; private boolean requested = false; private boolean closing = false; private HttpBody.ChunkHandler handler; + private ThreadContext.StoredContext requestContext; - public Netty4HttpRequestBodyStream(Channel channel) { + public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) { this.channel = channel; + this.threadContext = threadContext; + this.requestContext = threadContext.newStoredContext(); Netty4Utils.addListener(channel.closeFuture(), closeListener); channel.config().setAutoRead(false); } @@ -66,6 +71,7 @@ public void addTracingHandler(ChunkHandler chunkHandler) { public void next() { assert closing == false : "cannot request next chunk on closing stream"; assert handler != null : "handler must be set before requesting next chunk"; + requestContext = threadContext.newStoredContext(); channel.eventLoop().submit(() -> { requested = true; if (buf == null) { @@ -108,11 +114,6 @@ private void addChunk(ByteBuf chunk) { } } - // visible for test - Channel channel() { - return channel; - } - // visible for test ByteBuf buf() { return buf; @@ -129,10 +130,12 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; - for (var tracer : tracingHandlers) { - tracer.onNext(bytesRef, hasLast); + try (var ignored = threadContext.restoreExistingContext(requestContext)) { + for (var tracer : tracingHandlers) { + tracer.onNext(bytesRef, hasLast); + } + handler.onNext(bytesRef, hasLast); } - handler.onNext(bytesRef, hasLast); if (hasLast) { channel.config().setAutoRead(true); channel.closeFuture().removeListener(closeListener); @@ -150,11 +153,13 @@ public void close() { private void doClose() { closing = true; - for (var tracer : tracingHandlers) { - Releasables.closeExpectNoException(tracer); - } - if (handler != null) { - handler.close(); + try (var ignored = threadContext.restoreExistingContext(requestContext)) { + for (var tracer : tracingHandlers) { + Releasables.closeExpectNoException(tracer); + } + if (handler != null) { + handler.close(); + } } if (buf != null) { buf.release(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index f495883631a4e..5ff5a27e2d551 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -19,24 +19,33 @@ import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpBody; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.hasEntry; public class Netty4HttpRequestBodyStreamTests extends ESTestCase { - EmbeddedChannel channel; - Netty4HttpRequestBodyStream stream; + private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + private EmbeddedChannel channel; + private Netty4HttpRequestBodyStream stream; static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); @Override public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(); - stream = new Netty4HttpRequestBodyStream(channel); + threadContext.putHeader("header1", "value1"); + stream = new Netty4HttpRequestBodyStream(channel, threadContext); stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { @Override @@ -118,6 +127,60 @@ public void testReadFromChannel() { assertTrue("should receive last content", gotLast.get()); } + public void testReadFromHasCorrectThreadContext() throws InterruptedException { + var gotLast = new AtomicBoolean(false); + AtomicReference> headers = new AtomicReference<>(); + stream.setHandler(new HttpBody.ChunkHandler() { + @Override + public void onNext(ReleasableBytesReference chunk, boolean isLast) { + headers.set(threadContext.getHeaders()); + gotLast.set(isLast); + chunk.close(); + } + + @Override + public void close() { + headers.set(threadContext.getHeaders()); + } + }); + channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + var chunkSize = 1024; + + channel.writeInbound(randomContent(chunkSize)); + channel.writeInbound(randomLastContent(chunkSize)); + + threadContext.putHeader("header2", "value2"); + stream.next(); + + Thread thread = new Thread(() -> channel.runPendingTasks()); + thread.start(); + thread.join(); + + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); + + threadContext.putHeader("header3", "value3"); + stream.next(); + + thread = new Thread(() -> channel.runPendingTasks()); + thread.start(); + thread.join(); + + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); + assertThat(headers.get(), hasEntry("header3", "value3")); + + assertTrue("should receive last content", gotLast.get()); + + headers.set(new HashMap<>()); + + stream.close(); + + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); + assertThat(headers.get(), hasEntry("header3", "value3")); + } + HttpContent randomContent(int size, boolean isLast) { var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); if (isLast) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 2e7c87301b2f6..6ce198260ba3c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -43,12 +42,10 @@ public class IncrementalBulkService { private final Client client; private final AtomicBoolean enabledForTests = new AtomicBoolean(true); private final IndexingPressure indexingPressure; - private final ThreadContext threadContext; - public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) { + public IncrementalBulkService(Client client, IndexingPressure indexingPressure) { this.client = client; this.indexingPressure = indexingPressure; - this.threadContext = threadContext; } public Handler newBulkRequest() { @@ -58,7 +55,7 @@ public Handler newBulkRequest() { public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { ensureEnabled(); - return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh); + return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh); } private void ensureEnabled() { @@ -94,7 +91,6 @@ public static class Handler implements Releasable { public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true); private final Client client; - private final ThreadContext threadContext; private final IndexingPressure indexingPressure; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; @@ -106,22 +102,18 @@ public static class Handler implements Releasable { private boolean globalFailure = false; private boolean incrementalRequestSubmitted = false; private boolean bulkInProgress = false; - private ThreadContext.StoredContext requestContext; private Exception bulkActionLevelFailure = null; private long currentBulkSize = 0L; private BulkRequest bulkRequest = null; protected Handler( Client client, - ThreadContext threadContext, IndexingPressure indexingPressure, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh ) { this.client = client; - this.threadContext = threadContext; - this.requestContext = threadContext.newStoredContext(); this.indexingPressure = indexingPressure; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; @@ -141,31 +133,28 @@ public void addItems(List> items, Releasable releasable, Runn if (shouldBackOff()) { final boolean isFirstRequest = incrementalRequestSubmitted == false; incrementalRequestSubmitted = true; - try (var ignored = threadContext.restoreExistingContext(requestContext)) { - final ArrayList toRelease = new ArrayList<>(releasables); - releasables.clear(); - bulkInProgress = true; - client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { - - @Override - public void onResponse(BulkResponse bulkResponse) { - handleBulkSuccess(bulkResponse); - createNewBulkRequest( - new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true) - ); - } - - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - } - }, () -> { - bulkInProgress = false; - requestContext = threadContext.newStoredContext(); - toRelease.forEach(Releasable::close); - nextItems.run(); - })); - } + final ArrayList toRelease = new ArrayList<>(releasables); + releasables.clear(); + bulkInProgress = true; + client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { + + @Override + public void onResponse(BulkResponse bulkResponse) { + handleBulkSuccess(bulkResponse); + createNewBulkRequest( + new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true) + ); + } + + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + } + }, () -> { + bulkInProgress = false; + toRelease.forEach(Releasable::close); + nextItems.run(); + })); } else { nextItems.run(); } @@ -187,28 +176,26 @@ public void lastItems(List> items, Releasable releasable, Act } else { assert bulkRequest != null; if (internalAddItems(items, releasable)) { - try (var ignored = threadContext.restoreExistingContext(requestContext)) { - final ArrayList toRelease = new ArrayList<>(releasables); - releasables.clear(); - // We do not need to set this back to false as this will be the last request. - bulkInProgress = true; - client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() { - - private final boolean isFirstRequest = incrementalRequestSubmitted == false; - - @Override - public void onResponse(BulkResponse bulkResponse) { - handleBulkSuccess(bulkResponse); - listener.onResponse(combineResponses()); - } + final ArrayList toRelease = new ArrayList<>(releasables); + releasables.clear(); + // We do not need to set this back to false as this will be the last request. + bulkInProgress = true; + client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() { + + private final boolean isFirstRequest = incrementalRequestSubmitted == false; + + @Override + public void onResponse(BulkResponse bulkResponse) { + handleBulkSuccess(bulkResponse); + listener.onResponse(combineResponses()); + } - @Override - public void onFailure(Exception e) { - handleBulkFailure(isFirstRequest, e); - errorResponse(listener); - } - }, () -> toRelease.forEach(Releasable::close))); - } + @Override + public void onFailure(Exception e) { + handleBulkFailure(isFirstRequest, e); + errorResponse(listener); + } + }, () -> toRelease.forEach(Releasable::close))); } else { errorResponse(listener); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 0a88a202ac8d3..5354b1097326b 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -915,11 +915,7 @@ private void construct( terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( - client, - indexingLimits, - threadPool.getThreadContext() - ); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits); ActionModule actionModule = new ActionModule( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java index c8cf0bf93879b..f1b59ed14cefb 100644 --- a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java @@ -125,6 +125,7 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl if (request.isStreamedContent()) { assert action instanceof RequestBodyChunkConsumer; var chunkConsumer = (RequestBodyChunkConsumer) action; + request.contentStream().setHandler(new HttpBody.ChunkHandler() { @Override public void onNext(ReleasableBytesReference chunk, boolean isLast) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 1e80e6de60d65..7b82481d3d283 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -173,8 +173,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); - // TODO: Fix type deprecation logging - this.parser = new BulkRequestParser(false, request.getRestApiVersion()); + this.parser = new BulkRequestParser(true, request.getRestApiVersion()); this.handlerSupplier = handlerSupplier; } diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index 871062a687429..8d3561f2179cd 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -131,7 +131,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -196,7 +196,7 @@ public String getName() { null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET")); @@ -254,7 +254,7 @@ public List getRestHandlers( null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ); actionModule.initRestHandlers(null, null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -305,7 +305,7 @@ public void test3rdPartyHandlerIsNotInstalled() { null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ) ); assertThat( @@ -347,7 +347,7 @@ public void test3rdPartyRestControllerIsNotInstalled() { null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index 77133516f37d5..cf623e77f740a 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -1179,7 +1179,7 @@ public Collection getRestHeaders() { null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index d3cd6dd9ca420..25cfd1e56514c 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -20,8 +20,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.http.HttpBody; import org.elasticsearch.index.IndexVersion; @@ -67,7 +65,7 @@ public void bulk(BulkRequest request, ActionListener listener) { params.put("pipeline", "timestamps"); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} @@ -102,7 +100,7 @@ public void bulk(BulkRequest request, ActionListener listener) { { new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -126,7 +124,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -149,7 +147,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -173,7 +171,7 @@ public void bulk(BulkRequest request, ActionListener listener) { bulkCalled.set(false); new RestBulkAction( settings(IndexVersion.current()).build(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -229,7 +227,7 @@ public void next() { RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler( true, request, - () -> new IncrementalBulkService.Handler(null, new ThreadContext(Settings.EMPTY), null, null, null, null) { + () -> new IncrementalBulkService.Handler(null, null, null, null, null) { @Override public void addItems(List> items, Releasable releasable, Runnable nextItems) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 8d580f10e5137..c0e55992df88f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -824,7 +824,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc null, List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null, new ThreadContext(Settings.EMPTY)) + new IncrementalBulkService(null, null) ); actionModule.initRestHandlers(null, null);