From 7b42081a344385ad9208567ce4e6c97e697aa0a4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 15 Feb 2019 13:42:02 -0500 Subject: [PATCH] Advance max_seq_no before add operation to Lucene (#38879) Today when processing an operation on a replica engine (or the following engine), we first add it to Lucene, then add it to translog, then finally marks its seq_no as completed. If a flush occurs after step1, but before step-3, the max_seq_no in the commit's user_data will be smaller than the seq_no of some documents in the Lucene commit. --- .../index/engine/InternalEngine.java | 10 ++++ .../index/seqno/LocalCheckpointTracker.java | 9 ++++ .../TransportClientNodesServiceTests.java | 6 +-- .../index/engine/InternalEngineTests.java | 38 +++++++++++++++ .../transport/TransportActionProxyTests.java | 8 +--- .../index/engine/EngineTestCase.java | 42 +++++++++++----- .../org/elasticsearch/test/ESTestCase.java | 2 +- .../AbstractSimpleTransportTestCase.java | 8 +--- .../ccr/index/engine/FollowingEngine.java | 2 + .../index/engine/FollowingEngineTests.java | 48 +++++++++++++++++++ 10 files changed, 142 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ca3fedc989f9b..0225532efd23e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1004,6 +1004,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } } } + markSeqNoAsSeen(index.seqNo()); return plan; } @@ -1371,6 +1372,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 delete.seqNo(), delete.version()); } } + markSeqNoAsSeen(delete.seqNo()); return plan; } @@ -1525,6 +1527,7 @@ public void maybePruneDeletes() { public NoOpResult noOp(final NoOp noOp) { NoOpResult noOpResult; try (ReleasableLock ignored = readLock.acquire()) { + markSeqNoAsSeen(noOp.seqNo()); noOpResult = innerNoOp(noOp); } catch (final Exception e) { noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e); @@ -2498,6 +2501,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { localCheckpointTracker.waitForOpsToComplete(seqNo); } + /** + * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value. + */ + protected final void markSeqNoAsSeen(long seqNo) { + localCheckpointTracker.advanceMaxSeqNo(seqNo); + } + /** * Checks if the given operation has been processed in this engine or not. * @return true if the given operation was processed; otherwise false. diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 8249e2600ad55..a58736e26ff3a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -81,6 +81,15 @@ public synchronized long generateSeqNo() { return nextSeqNo++; } + /** + * Marks the provided sequence number as seen and updates the max_seq_no if needed. + */ + public synchronized void advanceMaxSeqNo(long seqNo) { + if (seqNo >= nextSeqNo) { + nextSeqNo = seqNo + 1; + } + } + /** * Marks the processing of the provided sequence number as completed as updates the checkpoint if possible. * diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 36c449e791852..ecdf56d0d8b92 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -215,11 +215,7 @@ public void close() { transport.endConnectMode(); transportService.stop(); transportClientNodesService.close(); - try { - terminate(threadPool); - } catch (InterruptedException e) { - throw new AssertionError(e); - } + terminate(threadPool); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3f8d936ac6bb0..f81aee412d78b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5581,4 +5581,42 @@ void assertLuceneOperations(InternalEngine engine, long expectedAppends, long ex assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates)); assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes)); } + + public void testMaxSeqNoInCommitUserData() throws Exception { + AtomicBoolean running = new AtomicBoolean(true); + Thread rollTranslog = new Thread(() -> { + while (running.get() && engine.getTranslog().currentFileGeneration() < 500) { + engine.rollTranslogGeneration(); // make adding operations to translog slower + } + }); + rollTranslog.start(); + + Thread indexing = new Thread(() -> { + long seqNo = 0; + while (running.get() && seqNo <= 1000) { + try { + String id = Long.toString(between(1, 50)); + if (randomBoolean()) { + ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqNo, false)); + } else { + engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L)); + } + seqNo++; + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + indexing.start(); + + int numCommits = between(5, 20); + for (int i = 0; i < numCommits; i++) { + engine.flush(false, true); + } + running.set(false); + indexing.join(); + rollTranslog.join(); + assertMaxSeqNoInCommitUserData(engine); + } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index 096a410ee1484..ec0c402743344 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -68,13 +68,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { super.tearDown(); - IOUtils.close(serviceA, serviceB, serviceC, () -> { - try { - terminate(threadPool); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + IOUtils.close(serviceA, serviceB, serviceC, () -> terminate(threadPool)); } private MockTransportService buildService(final Version version) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3deb064528fb5..9f4f24bfcced5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -27,6 +27,8 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; @@ -124,6 +126,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public abstract class EngineTestCase extends ESTestCase { @@ -251,18 +254,20 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @After public void tearDown() throws Exception { super.tearDown(); - if (engine != null && engine.isClosed.get() == false) { - engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); - } - if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + try { + if (engine != null && engine.isClosed.get() == false) { + engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); + assertMaxSeqNoInCommitUserData(engine); + } + if (replicaEngine != null && replicaEngine.isClosed.get() == false) { + replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + assertMaxSeqNoInCommitUserData(replicaEngine); + } + } finally { + IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); } - IOUtils.close( - replicaEngine, storeReplica, - engine, store); - terminate(threadPool); } @@ -992,6 +997,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } + /** + * Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit. + */ + public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception { + List commits = DirectoryReader.listCommits(engine.store.directory()); + for (IndexCommit commit : commits) { + try (DirectoryReader reader = DirectoryReader.open(commit)) { + AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); + assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + greaterThanOrEqualTo(maxSeqNoFromDocs.get())); + } + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index c5e5605079c4a..5e4e03fbf23f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -888,7 +888,7 @@ public static boolean terminate(ExecutorService... services) throws InterruptedE return terminated; } - public static boolean terminate(ThreadPool threadPool) throws InterruptedException { + public static boolean terminate(ThreadPool threadPool) { return ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index c817cf41f46db..505cb04611c9b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -197,13 +197,7 @@ public void tearDown() throws Exception { assertNoPendingHandshakes(serviceA.getOriginalTransport()); assertNoPendingHandshakes(serviceB.getOriginalTransport()); } finally { - IOUtils.close(serviceA, serviceB, () -> { - try { - terminate(threadPool); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + IOUtils.close(serviceA, serviceB, () -> terminate(threadPool)); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 041fd1903ad0d..7d64462e9db49 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -70,6 +70,7 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); + markSeqNoAsSeen(index.seqNo()); // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; @@ -105,6 +106,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind @Override protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { preFlight(delete); + markSeqNoAsSeen(delete.seqNo()); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index f09a9c2f6971b..e9ae2ba6157b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,6 +58,8 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; +import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; +import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -640,4 +642,50 @@ public void testProcessOnceOnPrimary() throws Exception { } } } + + public void testMaxSeqNoInCommitUserData() throws Exception { + final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine engine = createEngine(store, engineConfig)) { + AtomicBoolean running = new AtomicBoolean(true); + Thread rollTranslog = new Thread(() -> { + while (running.get() && getTranslog(engine).currentFileGeneration() < 500) { + engine.rollTranslogGeneration(); // make adding operations to translog slower + } + }); + rollTranslog.start(); + + Thread indexing = new Thread(() -> { + List ops = EngineTestCase.generateSingleDocHistory( + true, VersionType.EXTERNAL, false, 2, 50, 500, "id"); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong()); + for (Engine.Operation op : ops) { + if (running.get() == false) { + return; + } + try { + EngineTestCase.applyOperation(engine, op); + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + indexing.start(); + + int numCommits = between(5, 20); + for (int i = 0; i < numCommits; i++) { + engine.flush(false, true); + } + running.set(false); + indexing.join(); + rollTranslog.join(); + EngineTestCase.assertMaxSeqNoInCommitUserData(engine); + } + } + } }