From e869e8939be78bd7e42a21746793342e0ca2ca3c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 26 Jun 2018 11:22:57 +0200 Subject: [PATCH 1/2] Preserve thread context when connecting to remote cluster --- .../transport/RemoteClusterConnection.java | 7 ++- .../RemoteClusterConnectionTests.java | 59 +++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c86ea61980a87..91d7b1b372e51 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -369,9 +370,11 @@ void forceConnect() { private void connect(ActionListener connectListener, boolean forceRun) { final boolean runConnect; final Collection> toNotify; + final ActionListener listener = connectListener == null ? null : + ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext()); synchronized (queue) { - if (connectListener != null && queue.offer(connectListener) == false) { - connectListener.onFailure(new RejectedExecutionException("connect queue is full")); + if (listener != null && queue.offer(listener) == false) { + listener.onFailure(new RejectedExecutionException("connect queue is full")); return; } if (forceRun == false && queue.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0369eda2a8899..6b65edaff962a 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; @@ -555,6 +556,64 @@ public void testFetchShards() throws Exception { } } + public void testFetchShardsThreadContextHeader() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + List nodes = Collections.singletonList(seedNode); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + nodes, service, Integer.MAX_VALUE, n -> true)) { + SearchRequest request = new SearchRequest("test-index"); + Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.length; i++) { + final String threadId = Integer.toString(i); + threads[i] = new Thread(() -> { + ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); + threadContext.putHeader("threadId", threadId); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + CountDownLatch responseLatch = new CountDownLatch(1); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap( + resp -> { + reference.set(resp); + assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); + }, + failReference::set), responseLatch)); + try { + responseLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertNull(failReference.get()); + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + }); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } + public void testFetchShardsSkipUnavailable() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { From c3e9ec42276f3e801662aa873b89a82dc7b922ca Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 26 Jun 2018 12:03:45 +0200 Subject: [PATCH 2/2] checkstyle --- .../elasticsearch/transport/RemoteClusterConnectionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 6b65edaff962a..9393a47475a8c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -593,7 +593,7 @@ public void testFetchShardsThreadContextHeader() throws Exception { try { responseLatch.await(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } assertNull(failReference.get()); assertNotNull(reference.get());