From f2ac1162423329b7f3b5f911bf16087795640a6f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 23 Oct 2019 10:20:23 -0400 Subject: [PATCH 1/3] Refresh should not acquire readLock --- .../org/elasticsearch/index/engine/InternalEngine.java | 6 ++---- .../elasticsearch/index/engine/InternalEngineTests.java | 8 ++++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1796d984c37ae..fcad87b9d0dbc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1563,14 +1563,12 @@ public boolean maybeRefresh(String source) throws EngineException { } final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are refreshing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); + try { + // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in the meantime. if (store.tryIncRef()) { // increment the ref just to ensure nobody closes the store during a refresh try { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 95bfa4a03c698..27a4b396d345d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5761,7 +5761,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception { assertMaxSeqNoInCommitUserData(engine); } - public void testRefreshAndFailEngineConcurrently() throws Exception { + public void testRefreshAndCloseEngineConcurrently() throws Exception { AtomicBoolean stopped = new AtomicBoolean(); Semaphore indexedDocs = new Semaphore(0); Thread indexer = new Thread(() -> { @@ -5791,7 +5791,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { refresher.start(); indexedDocs.acquire(randomIntBetween(1, 100)); try { - engine.failEngine("test", new IOException("simulated error")); + if (randomBoolean()) { + engine.failEngine("test", new IOException("simulated error")); + } else { + engine.close(); + } } finally { stopped.set(true); indexer.join(); From 449b463de2742cca29052d09d7ff6a2b87199e5b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 23 Oct 2019 22:01:30 -0400 Subject: [PATCH 2/3] comment --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fcad87b9d0dbc..34378683cd419 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1568,7 +1568,7 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; try { - // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in the meantime. + // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. if (store.tryIncRef()) { // increment the ref just to ensure nobody closes the store during a refresh try { From 37e1558c5e3b3cfafb863067cd2bf8d550298529 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 24 Oct 2019 17:50:49 -0400 Subject: [PATCH 3/3] add test verifying that refresh does not block closing --- .../index/engine/InternalEngineTests.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 27a4b396d345d..f1db530875aa9 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -136,6 +136,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import java.io.Closeable; @@ -6137,4 +6138,41 @@ public void afterRefresh(boolean didRefresh) { } } } + + public void testRefreshDoesNotBlockClosing() throws Exception { + final CountDownLatch refreshStarted = new CountDownLatch(1); + final CountDownLatch engineClosed = new CountDownLatch(1); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + + @Override + public void beforeRefresh() { + refreshStarted.countDown(); + try { + engineClosed.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + @Override + public void afterRefresh(boolean didRefresh) { + assertFalse(didRefresh); + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + if (randomBoolean()) { + engine.index(indexForDoc(createParsedDoc("id", null))); + } + threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> + expectThrows(AlreadyClosedException.class, + () -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true))); + refreshStarted.await(); + engine.close(); + engineClosed.countDown(); + } + } + } }