diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2613e7ad330ff..8b4bc32dc74bc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1569,14 +1569,12 @@ public boolean maybeRefresh(String source) throws EngineException { } final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are refreshing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); + try { + // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. if (store.tryIncRef()) { // increment the ref just to ensure nobody closes the store during a refresh try { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 66aaff856b963..eed0a6dcb6b47 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -141,6 +141,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import java.io.Closeable; @@ -5781,7 +5782,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception { assertMaxSeqNoInCommitUserData(engine); } - public void testRefreshAndFailEngineConcurrently() throws Exception { + public void testRefreshAndCloseEngineConcurrently() throws Exception { AtomicBoolean stopped = new AtomicBoolean(); Semaphore indexedDocs = new Semaphore(0); Thread indexer = new Thread(() -> { @@ -5811,7 +5812,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { refresher.start(); indexedDocs.acquire(randomIntBetween(1, 100)); try { - engine.failEngine("test", new IOException("simulated error")); + if (randomBoolean()) { + engine.failEngine("test", new IOException("simulated error")); + } else { + engine.close(); + } } finally { stopped.set(true); indexer.join(); @@ -6153,4 +6158,41 @@ public void afterRefresh(boolean didRefresh) { } } } + + public void testRefreshDoesNotBlockClosing() throws Exception { + final CountDownLatch refreshStarted = new CountDownLatch(1); + final CountDownLatch engineClosed = new CountDownLatch(1); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + + @Override + public void beforeRefresh() { + refreshStarted.countDown(); + try { + engineClosed.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + @Override + public void afterRefresh(boolean didRefresh) { + assertFalse(didRefresh); + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + if (randomBoolean()) { + engine.index(indexForDoc(createParsedDoc("id", null))); + } + threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> + expectThrows(AlreadyClosedException.class, + () -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true))); + refreshStarted.await(); + engine.close(); + engineClosed.countDown(); + } + } + } }