diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1796d984c37ae..34378683cd419 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1563,14 +1563,12 @@ public boolean maybeRefresh(String source) throws EngineException { } final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are refreshing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); + try { + // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. if (store.tryIncRef()) { // increment the ref just to ensure nobody closes the store during a refresh try { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 95bfa4a03c698..f1db530875aa9 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -136,6 +136,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import java.io.Closeable; @@ -5761,7 +5762,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception { assertMaxSeqNoInCommitUserData(engine); } - public void testRefreshAndFailEngineConcurrently() throws Exception { + public void testRefreshAndCloseEngineConcurrently() throws Exception { AtomicBoolean stopped = new AtomicBoolean(); Semaphore indexedDocs = new Semaphore(0); Thread indexer = new Thread(() -> { @@ -5791,7 +5792,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { refresher.start(); indexedDocs.acquire(randomIntBetween(1, 100)); try { - engine.failEngine("test", new IOException("simulated error")); + if (randomBoolean()) { + engine.failEngine("test", new IOException("simulated error")); + } else { + engine.close(); + } } finally { stopped.set(true); indexer.join(); @@ -6133,4 +6138,41 @@ public void afterRefresh(boolean didRefresh) { } } } + + public void testRefreshDoesNotBlockClosing() throws Exception { + final CountDownLatch refreshStarted = new CountDownLatch(1); + final CountDownLatch engineClosed = new CountDownLatch(1); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + + @Override + public void beforeRefresh() { + refreshStarted.countDown(); + try { + engineClosed.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + @Override + public void afterRefresh(boolean didRefresh) { + assertFalse(didRefresh); + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + if (randomBoolean()) { + engine.index(indexForDoc(createParsedDoc("id", null))); + } + threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> + expectThrows(AlreadyClosedException.class, + () -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true))); + refreshStarted.await(); + engine.close(); + engineClosed.countDown(); + } + } + } }