From 8a7e79794bfda51efe777106cd5a1c0bb61ecb1a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 8 Feb 2020 16:18:23 -0500 Subject: [PATCH 1/3] Fix leaking searcher when index got deleted --- .../elasticsearch/search/SearchService.java | 12 +++---- .../search/SearchServiceTests.java | 31 +++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index bb83a56599532..3d26485bd7994 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -694,14 +694,14 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time } private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) { - final ShardSearchRequest request = rewriteContext.request; - final Engine.Searcher searcher = rewriteContext.searcher; - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.getShard(request.shardId().getId()); - SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), - indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); boolean success = false; try { + final ShardSearchRequest request = rewriteContext.request; + final Engine.Searcher searcher = rewriteContext.searcher; + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().getId()); + SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), + indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); success = true; diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index c9dc231f0997b..2e61d63c81117 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -898,4 +898,35 @@ public void onFailure(Exception e) { latch.await(); } } + + public void testDeleteIndexWhileSearch() throws Exception { + createIndex("test"); + int numDocs = randomIntBetween(1, 20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setSource("f", "v").get(); + } + client().admin().indices().prepareRefresh("test").get(); + AtomicBoolean stopped = new AtomicBoolean(false); + Thread[] searchers = new Thread[randomIntBetween(1, 4)]; + CountDownLatch latch = new CountDownLatch(searchers.length); + for (int i = 0; i < searchers.length; i++) { + searchers[i] = new Thread(() -> { + latch.countDown(); + while (stopped.get() == false) { + try { + client().prepareSearch("test").setRequestCache(false).get(); + } catch (Exception ignored) { + return; + } + } + }); + searchers[i].start(); + } + latch.await(); + client().admin().indices().prepareDelete("test").get(); + stopped.set(true); + for (Thread searcher : searchers) { + searcher.join(); + } + } } From 8dfcaafbc8d5ef359e9faac5a67da4f4edf5adb9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 8 Feb 2020 22:23:51 -0500 Subject: [PATCH 2/3] add test for relocation --- .../elasticsearch/recovery/RelocationIT.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 2a29582fdefd3..a04f92d823611 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; @@ -86,6 +87,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -456,7 +458,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO } } - public void testIndexAndRelocateConcurrently() throws Exception { + public void testIndexSearchAndRelocateConcurrently() throws Exception { int halfNodes = randomIntBetween(1, 3); Settings[] nodeSettings = Stream.concat( Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), @@ -473,8 +475,21 @@ public void testIndexAndRelocateConcurrently() throws Exception { .put("index.routing.allocation.exclude.color", "blue") .put(indexSettings()) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)); + if (randomBoolean()) { + settings.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), randomIntBetween(1, 10) + "s"); + } assertAcked(prepareCreate("test", settings)); assertAllShardsOnNodes("test", redNodes); + AtomicBoolean stopped = new AtomicBoolean(false); + Thread[] searchThreads = randomBoolean() ? new Thread[0] : new Thread[randomIntBetween(1, 4)]; + for (int i = 0; i < searchThreads.length; i++) { + searchThreads[i] = new Thread(() -> { + while (stopped.get() == false) { + assertNoFailures(client().prepareSearch("test").setRequestCache(false).get()); + } + }); + searchThreads[i].start(); + } int numDocs = randomIntBetween(100, 150); ArrayList ids = new ArrayList<>(); logger.info(" --> indexing [{}] docs", numDocs); @@ -512,7 +527,10 @@ public void testIndexAndRelocateConcurrently() throws Exception { assertNoFailures(afterRelocation); assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()])); } - + stopped.set(true); + for (Thread searchThread : searchThreads) { + searchThread.join(); + } } public void testRelocateWhileWaitingForRefresh() { From ad5fe76c69633199fb7acac9e3d9ee7fbc6352a7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 9 Feb 2020 15:51:43 -0500 Subject: [PATCH 3/3] update comment --- .../main/java/org/elasticsearch/search/SearchService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 3d26485bd7994..d3afcd5c4677a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -708,8 +708,9 @@ private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteCon return searchContext; } finally { if (success == false) { - // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise - // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). + // we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext + // constructor throws an exception since we would otherwise leak a searcher and this can have severe implications + // (unable to obtain shard lock exceptions). IOUtils.closeWhileHandlingException(rewriteContext.searcher); } }