From 97b9d2bde74bedb90ab0df05440a10e509019156 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 9 May 2018 23:35:27 +0200 Subject: [PATCH] [CCR] Read changes from Lucene instead of translog (#30120) This commit adds an API to read translog snapshot from Lucene, then cut-over from the existing translog to the new API in CCR. Relates #30086 Relates #29530 --- .../elasticsearch/index/engine/Engine.java | 7 + .../index/engine/InternalEngine.java | 45 +++ .../index/engine/LuceneChangesSnapshot.java | 280 ++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 22 +- .../index/engine/InternalEngineTests.java | 36 --- .../engine/LuceneChangesSnapshotTests.java | 275 +++++++++++++++++ .../index/engine/EngineTestCase.java | 153 +++++----- .../index/shard/IndexShardTestCase.java | 17 +- .../xpack/ccr/FollowIndexIT.java | 4 + .../xpack/ccr/action/ShardChangesAction.java | 46 +-- .../xpack/ccr/ShardChangesIT.java | 108 +++++++ .../ccr/action/ShardChangesActionTests.java | 22 +- 12 files changed, 833 insertions(+), 182 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d7e5054a7c830..372b65ee7df7d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -58,6 +58,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; @@ -609,6 +610,12 @@ public Translog.Location getTranslogLastWriteLocation() { return getTranslog().getLastWriteLocation(); } + /** + * Creates a new "translog" snapshot from Lucene for reading operations whose seqno in the requesting seqno range + */ + public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService, + long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException; + protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 49cd5da2aec42..1b2037f738f2e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -69,6 +69,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.UidFieldMapper; @@ -152,6 +153,7 @@ public class InternalEngine extends Engine { private final CounterMetric numDocUpdates = new CounterMetric(); private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField(); private final boolean softDeleteEnabled; + private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; /** * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this @@ -229,6 +231,8 @@ public InternalEngine(EngineConfig engineConfig) { for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { this.internalSearcherManager.addListener(listener); } + this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); + this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); success = true; } finally { if (success == false) { @@ -2402,6 +2406,23 @@ long getNumDocUpdates() { return numDocUpdates.count(); } + public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService, + long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException { + // TODO: Should we defer the refresh until we really need it? + ensureOpen(); + if (lastRefreshedCheckpoint() < maxSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); + try { + LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange); + searcher = null; + return snapshot; + } finally { + IOUtils.close(searcher); + } + } + @Override public boolean isRecovering() { return pendingTranslogRecovery.get(); @@ -2448,4 +2469,28 @@ public long softUpdateDocuments(Term term, Iterable toSeqNo) { + throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); + } + this.mapperService = mapperService; + this.fromSeqNo = fromSeqNo; + this.toSeqNo = toSeqNo; + this.lastSeenSeqNo = fromSeqNo - 1; + this.requiredFullRange = requiredFullRange; + this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); + this.indexSearcher.setQueryCache(null); + this.topDocs = searchOperations(indexSearcher); + final List leaves = indexSearcher.getIndexReader().leaves(); + this.docValues = new CombinedDocValues[leaves.size()]; + for (LeafReaderContext leaf : leaves) { + this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader()); + } + this.onClose = engineSearcher; + } + + @Override + public void close() throws IOException { + onClose.close(); + } + + @Override + public int totalOperations() { + return Math.toIntExact(topDocs.totalHits); + } + + @Override + public int overriddenOperations() { + return skippedOperations; + } + + @Override + public Translog.Operation next() throws IOException { + Translog.Operation op = null; + for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) { + op = readDocAsOp(docId); + if (op != null) { + break; + } + } + if (requiredFullRange) { + rangeCheck(op); + } + if (op != null) { + lastSeenSeqNo = op.seqNo(); + } + return op; + } + + private void rangeCheck(Translog.Operation op) { + if (op == null) { + if (lastSeenSeqNo < toSeqNo) { + throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " + + "and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]"); + } + } else { + final long expectedSeqNo = lastSeenSeqNo + 1; + if (op.seqNo() != expectedSeqNo) { + throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " + + "and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]"); + } + } + } + + private int nextDocId() { + if (docIndex < topDocs.scoreDocs.length) { + final int docId = topDocs.scoreDocs[docIndex].doc; + docIndex++; + return docId; + } else { + return DocIdSetIterator.NO_MORE_DOCS; + } + } + + private TopDocs searchOperations(IndexSearcher searcher) throws IOException { + final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); + final Sort sortedBySeqNoThenByTerm = new Sort( + new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), + new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) + ); + return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm); + } + + private Translog.Operation readDocAsOp(int docID) throws IOException { + final List leaves = indexSearcher.getIndexReader().leaves(); + final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves)); + final int segmentDocID = docID - leaf.docBase; + final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID); + // We don't have to read the nested child documents - those docs don't have primary terms. + if (primaryTerm == -1) { + skippedOperations++; + return null; + } + final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID); + // Only pick the first seen seq# + if (seqNo == lastSeenSeqNo) { + skippedOperations++; + return null; + } + final long version = docValues[leaf.ord].docVersion(segmentDocID); + final FieldsVisitor fields = new FieldsVisitor(true); + indexSearcher.doc(docID, fields); + fields.postProcess(mapperService); + + final Translog.Operation op; + final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID); + if (isTombstone && fields.uid() == null) { + op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields? + assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]"; + } else { + final String id = fields.uid().id(); + final String type = fields.uid().type(); + final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); + if (isTombstone) { + op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL); + assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]"; + } else { + final BytesReference source = fields.source(); + // TODO: pass the latest timestamp from engine. + final long autoGeneratedIdTimestamp = -1; + op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, + source.toBytesRef().bytes, fields.routing(), null, autoGeneratedIdTimestamp); + } + } + assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " + + "last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]"; + return op; + } + + private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException { + final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD); + if (ndv == null || ndv.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETE_FIELD + "] is not found"); + } + return ndv.longValue() == 1; + } + + private static final class CombinedDocValues { + private final LeafReader leafReader; + private NumericDocValues versionDV; + private NumericDocValues seqNoDV; + private NumericDocValues primaryTermDV; + private NumericDocValues tombstoneDV; + + CombinedDocValues(LeafReader leafReader) throws IOException { + this.leafReader = leafReader; + this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); + this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); + this.primaryTermDV = Objects.requireNonNull( + leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); + this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + } + + long docVersion(int segmentDocId) throws IOException { + if (versionDV.docID() > segmentDocId) { + versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); + } + if (versionDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); + } + return versionDV.longValue(); + } + + long docSeqNo(int segmentDocId) throws IOException { + if (seqNoDV.docID() > segmentDocId) { + seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); + } + if (seqNoDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); + } + return seqNoDV.longValue(); + } + + long docPrimaryTerm(int segmentDocId) throws IOException { + if (primaryTermDV == null) { + return -1L; + } + if (primaryTermDV.docID() > segmentDocId) { + primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + } + // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. + if (primaryTermDV.advanceExact(segmentDocId) == false) { + return -1; + } + return primaryTermDV.longValue(); + } + + boolean isTombstone(int segmentDocId) throws IOException { + if (tombstoneDV == null) { + return false; + } + if (tombstoneDV.docID() > segmentDocId) { + tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + } + return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5c4e1d3ef8be2..369af043cae31 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1630,11 +1630,7 @@ public Closeable acquireTranslogRetentionLock() { * The caller has to close the returned snapshot after finishing the reading. */ public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - return newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE); - } - - public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { - return getEngine().newTranslogSnapshotBetween(minSeqNo, maxSeqNo); + return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE); } /** @@ -1644,6 +1640,22 @@ public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo); } + /** + * Creates a new "translog" snapshot from Lucene for reading operations whose seqno is between minSeqNo and maxSeqNo. + * The caller has to close the returned snapshot after finishing the reading. + * + * @param source the source of the request + * @param minSeqNo the min_seqno to read - inclusive + * @param maxSeqNo the max_seqno to read - inclusive + * @param requiredFullRange if true then {@link Translog.Snapshot#next()} will throw {@link IllegalStateException} + * if any operation between minSeqNo and maxSeqNo is missing. This parameter should be only + * enabled when the requesting range is below the global checkpoint. + */ + public Translog.Snapshot newLuceneChangesSnapshot(String source, long minSeqNo, long maxSeqNo, + boolean requiredFullRange) throws IOException { + return getEngine().newLuceneChangesSnapshot(source, mapperService, minSeqNo, maxSeqNo, requiredFullRange); + } + public List segments(boolean verbose) { return getEngine().segments(verbose); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5857f45326919..c05c480839d58 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1563,42 +1563,6 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup assertVisibleCount(engine, totalExpectedOps); } - private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { - Thread[] thread = new Thread[randomIntBetween(3, 5)]; - CountDownLatch startGun = new CountDownLatch(thread.length); - AtomicInteger offset = new AtomicInteger(-1); - for (int i = 0; i < thread.length; i++) { - thread[i] = new Thread(() -> { - startGun.countDown(); - try { - startGun.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - int docOffset; - while ((docOffset = offset.incrementAndGet()) < ops.size()) { - try { - final Engine.Operation op = ops.get(docOffset); - if (op instanceof Engine.Index) { - engine.index((Engine.Index) op); - } else { - engine.delete((Engine.Delete) op); - } - if ((docOffset + 1) % 4 == 0) { - engine.refresh("test"); - } - } catch (IOException e) { - throw new AssertionError(e); - } - } - }); - thread[i].start(); - } - for (int i = 0; i < thread.length; i++) { - thread[i].join(); - } - } - public void testInternalVersioningOnPrimary() throws IOException { final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20, "1"); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java new file mode 100644 index 0000000000000..0b2116e1494d6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -0,0 +1,275 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class LuceneChangesSnapshotTests extends EngineTestCase { + private MapperService mapperService; + + @Before + public void createMapper() throws Exception { + mapperService = createMapperService("test"); + } + + public void testBasics() throws Exception { + long fromSeqNo = randomNonNegativeLong(); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + // Empty engine + try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) { + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat(error.getMessage(), + containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + } + try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } + int numOps = between(1, 100); + int refreshedSeqNo = -1; + for (int i = 0; i < numOps; i++) { + String id = Integer.toString(randomIntBetween(i, i + 5)); + ParsedDocument doc = createParsedDoc(id, null); + if (randomBoolean()) { + engine.index(indexForDoc(doc)); + } else { + engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + } + if (rarely()) { + if (randomBoolean()) { + engine.flush(); + } else { + engine.refresh("test"); + } + refreshedSeqNo = i; + } + } + if (refreshedSeqNo == -1) { + fromSeqNo = between(0, numOps); + toSeqNo = randomLongBetween(fromSeqNo, numOps * 2); + + Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.size(0)); + } finally { + IOUtils.close(searcher); + } + + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + searcher = null; + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat(error.getMessage(), + containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + }finally { + IOUtils.close(searcher); + } + }else { + fromSeqNo = randomLongBetween(0, refreshedSeqNo); + toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); + Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); + }finally { + IOUtils.close(searcher); + } + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + searcher = null; + IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); + assertThat(error.getMessage(), + containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + }finally { + IOUtils.close(searcher); + } + toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo); + searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + searcher = null; + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); + }finally { + IOUtils.close(searcher); + } + } + // Get snapshot via engine will auto refresh + fromSeqNo = randomLongBetween(0, numOps - 1); + toSeqNo = randomLongBetween(fromSeqNo, numOps - 1); + try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean())) { + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); + } + } + + public void testDedupByPrimaryTerm() throws Exception { + Map latestOperations = new HashMap<>(); + List terms = Arrays.asList(between(1, 1000), between(1000, 2000)); + int totalOps = 0; + for (long term : terms) { + final List ops = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), false, term, 2, 20, "1"); + primaryTerm.set(Math.max(primaryTerm.get(), term)); + engine.rollTranslogGeneration(); + for (Engine.Operation op : ops) { + // We need to simulate a rollback here as only ops after local checkpoint get into the engine + if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) { + engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1)); + engine.rollTranslogGeneration(); + } + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete) { + engine.delete((Engine.Delete) op); + } + latestOperations.put(op.seqNo(), op.primaryTerm()); + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + totalOps++; + } + } + long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, 0, maxSeqNo, false)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); + } + assertThat(snapshot.overriddenOperations(), equalTo(totalOps - latestOperations.size())); + } + } + + public void testUpdateAndReadChangesConcurrently() throws Exception { + Follower[] followers = new Follower[between(1, 3)]; + CountDownLatch readyLatch = new CountDownLatch(followers.length + 1); + AtomicBoolean isDone = new AtomicBoolean(); + for (int i = 0; i < followers.length; i++) { + followers[i] = new Follower(engine, isDone, readyLatch); + followers[i].start(); + } + boolean onPrimary = randomBoolean(); + List operations = new ArrayList<>(); + int numOps = scaledRandomIntBetween(1, 1000); + for (int i = 0; i < numOps; i++) { + String id = Integer.toString(randomIntBetween(1, 10)); + ParsedDocument doc = createParsedDoc(id, randomAlphaOfLengthBetween(1, 5)); + final Engine.Operation op; + if (onPrimary) { + if (randomBoolean()) { + op = new Engine.Index(newUid(doc), primaryTerm.get(), doc); + } else { + op = new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()); + } + } else { + if (randomBoolean()) { + op = replicaIndexForDoc(doc, randomNonNegativeLong(), i, randomBoolean()); + } else { + op = replicaDeleteForDoc(doc.id(), randomNonNegativeLong(), i, randomNonNegativeLong()); + } + } + operations.add(op); + } + readyLatch.countDown(); + concurrentlyApplyOps(operations, engine); + assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(operations.size() - 1L)); + isDone.set(true); + for (Follower follower : followers) { + follower.join(); + } + } + + class Follower extends Thread { + private final Engine leader; + private final TranslogHandler translogHandler; + private final AtomicBoolean isDone; + private final CountDownLatch readLatch; + + Follower(Engine leader, AtomicBoolean isDone, CountDownLatch readLatch) { + this.leader = leader; + this.isDone = isDone; + this.readLatch = readLatch; + this.translogHandler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), + engine.engineConfig.getIndexSettings().getSettings())); + } + + void pullOperations(Engine follower) throws IOException { + long leaderCheckpoint = leader.getLocalCheckpointTracker().getCheckpoint(); + long followerCheckpoint = follower.getLocalCheckpointTracker().getCheckpoint(); + if (followerCheckpoint < leaderCheckpoint) { + long fromSeqNo = followerCheckpoint + 1; + long batchSize = randomLongBetween(0, 100); + long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); + try (Translog.Snapshot snapshot = leader.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) { + translogHandler.run(follower, snapshot); + } + } + } + + @Override + public void run() { + try (Store store = createStore(); + InternalEngine follower = createEngine(store, createTempDir())) { + readLatch.countDown(); + readLatch.await(); + while (isDone.get() == false || + follower.getLocalCheckpointTracker().getCheckpoint() < leader.getLocalCheckpointTracker().getCheckpoint()) { + pullOperations(follower); + } + assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, mapperService); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + } + + private List drainAll(Translog.Snapshot snapshot) throws IOException { + List operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + final Translog.Operation newOp = op; + logger.error("Reading [{}]", op); + assert operations.stream().allMatch(o -> o.seqNo() < newOp.seqNo()) : "Operations [" + operations + "], op [" + op + "]"; + operations.add(newOp); + } + return operations; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3bbefa799d9be..026c24d160023 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -22,29 +22,27 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; -import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -67,7 +65,6 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -101,8 +98,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Function; @@ -116,7 +117,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; public abstract class EngineTestCase extends ESTestCase { @@ -568,7 +568,7 @@ protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long } protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { - return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime); } protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { @@ -716,80 +716,85 @@ public static void assertOpsOnReplica( } } - /** - * Reads all engine operations that have been processed by the engine from Lucene index. - * The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation. - */ - public static List readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException { - engine.refresh("test"); - final List operations = new ArrayList<>(); - try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { - final IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); - final Sort sortedBySeqNoThenByTerm = new Sort( - new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), - new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) - ); - final TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE, sortedBySeqNoThenByTerm); - long lastSeenSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - for (ScoreDoc scoreDoc : allDocs.scoreDocs) { - final Translog.Operation op = readOperationInLucene(indexSearcher, mapper, scoreDoc.doc); - if (op.seqNo() != lastSeenSeqNo) { - operations.add(op); - lastSeenSeqNo = op.seqNo(); + protected void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch startGun = new CountDownLatch(thread.length); + AtomicInteger offset = new AtomicInteger(-1); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread(() -> { + startGun.countDown(); + try { + startGun.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); } - } + int docOffset; + while ((docOffset = offset.incrementAndGet()) < ops.size()) { + try { + final Engine.Operation op = ops.get(docOffset); + if (op instanceof Engine.Index) { + engine.index((Engine.Index) op); + } else if (op instanceof Engine.Delete){ + engine.delete((Engine.Delete) op); + } else { + engine.noOp((Engine.NoOp) op); + } + if ((docOffset + 1) % 4 == 0) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + thread[i].start(); } - return operations; - } - - private static Translog.Operation readOperationInLucene(IndexSearcher searcher, MapperService mapper, int docID) throws IOException { - final List leaves = searcher.getIndexReader().leaves(); - final int leafIndex = ReaderUtil.subIndex(docID, leaves); - final int segmentDocID = docID - leaves.get(leafIndex).docBase; - final long seqNo = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.NAME, segmentDocID); - final long primaryTerm = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID); - final long version = readNumericDV(leaves.get(leafIndex), VersionFieldMapper.NAME, segmentDocID); - final FieldsVisitor fields = new FieldsVisitor(true); - searcher.doc(docID, fields); - fields.postProcess(mapper); - final Translog.Operation op; - final boolean isTombstone = isTombstoneOperation(leaves.get(leafIndex), segmentDocID); - if (isTombstone && fields.uid() == null) { - op = new Translog.NoOp(seqNo, primaryTerm, ""); - assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 - : "Noop operation but soft_deletes field is not set"; - assert version == 1 : "Noop tombstone should have version 1L; actual version [" + version + "]"; - } else { - final String id = fields.uid().id(); - final String type = fields.uid().type(); - final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); - if (isTombstone) { - op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL); - assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 - : "Delete operation but soft_deletes field is not set"; - } else { - final BytesReference source = fields.source(); - op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, source.toBytesRef().bytes, - fields.routing(), null, -1); - } + for (int i = 0; i < thread.length; i++) { + thread[i].join(); } - return op; } - private static boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException { - final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) { - return tombstoneDV.longValue() == 1; + /** + * Gets all docId from the given engine. + */ + public static Set getDocIds(Engine engine, boolean refresh) throws IOException { + if (refresh) { + engine.refresh("test_get_doc_ids"); + } + try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) { + Set ids = new HashSet<>(); + for (LeafReaderContext leafContext : searcher.reader().leaves()) { + LeafReader reader = leafContext.reader(); + Bits liveDocs = reader.getLiveDocs(); + for (int i = 0; i < reader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); + BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); + ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); + } + } + } + return ids; } - return false; } - private static long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException { - final NumericDocValues dv = leaf.reader().getNumericDocValues(field); - if (dv == null || dv.advanceExact(segmentDocID) == false) { - throw new IllegalStateException("DocValues for field [" + field + "] is not found"); + /** + * Reads all engine operations that have been processed by the engine from Lucene index. + * The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation. + */ + public static List readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException { + final List operations = new ArrayList<>(); + long maxSeqNo = Math.max(0, engine.getLocalCheckpointTracker().getMaxSeqNo()); + try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapper, 0, maxSeqNo, false)) { + Translog.Operation op; + while ((op = snapshot.next()) != null){ + operations.add(op); + } } - return dv.longValue(); + return operations; } /** @@ -822,7 +827,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } assertThat(luceneOp, notNullValue()); - assertThat(luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); + assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); assertThat(luceneOp.opType(), equalTo(translogOp.opType())); if (luceneOp.opType() == Translog.Operation.Type.INDEX) { assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 447b9c535a0a3..6a5783d8f2b95 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -519,22 +519,7 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th } protected Set getShardDocUIDs(final IndexShard shard) throws IOException { - shard.refresh("get_uids"); - try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - Set ids = new HashSet<>(); - for (LeafReaderContext leafContext : searcher.reader().leaves()) { - LeafReader reader = leafContext.reader(); - Bits liveDocs = reader.getLiveDocs(); - for (int i = 0; i < reader.maxDoc(); i++) { - if (liveDocs == null || liveDocs.get(i)) { - Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); - BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); - ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); - } - } - } - return ids; - } + return EngineTestCase.getDocIds(shard.getEngine(), true); } protected void assertDocCount(IndexShard shard, int docDount) throws IOException { diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 09556bf748726..ed3413969b396 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -44,6 +44,10 @@ public void testFollowIndex() throws Exception { final String leaderIndexName = "test_index1"; if (runningAgainstLeaderCluster) { logger.info("Running against leader cluster"); + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + createIndex(leaderIndexName, indexSettings); for (int i = 0; i < numDocs; i++) { logger.info("Indexing doc [{}]", i); index(client(), leaderIndexName, Integer.toString(i), "field", i); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index d5c774f53a0ed..0e61521f95f21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -34,11 +34,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.PriorityQueue; -import java.util.Queue; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -227,6 +224,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); + request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint()); return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); } @@ -256,42 +254,18 @@ static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } - - long seenBytes = 0; - long nextExpectedSeqNo = minSeqNo; - final Queue orderedOps = new PriorityQueue<>(Comparator.comparingLong(Translog.Operation::seqNo)); - + int seenBytes = 0; final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = indexShard.newTranslogSnapshotBetween(minSeqNo, maxSeqNo)) { - for (Translog.Operation unorderedOp = snapshot.next(); unorderedOp != null; unorderedOp = snapshot.next()) { - if (unorderedOp.seqNo() < minSeqNo || unorderedOp.seqNo() > maxSeqNo) { - continue; - } - - orderedOps.add(unorderedOp); - while (orderedOps.peek() != null && orderedOps.peek().seqNo() == nextExpectedSeqNo) { - Translog.Operation orderedOp = orderedOps.poll(); - if (seenBytes < byteLimit) { - nextExpectedSeqNo++; - seenBytes += orderedOp.estimateSize(); - operations.add(orderedOp); - if (nextExpectedSeqNo > maxSeqNo) { - return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); - } - } else { - return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); - } + try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operations.add(op); + seenBytes += op.estimateSize(); + if (seenBytes > byteLimit) { + break; } } } - - if (nextExpectedSeqNo >= maxSeqNo) { - return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); - } else { - String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo + - "] found, tracker checkpoint [" + nextExpectedSeqNo + "]"; - throw new IllegalStateException(message); - } + return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); } - } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 807bdc5dda6f1..12249937e9e1b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -17,6 +17,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -218,6 +220,61 @@ public void testFollowIndex() throws Exception { }); } + public void testFollowIndexWithNestedField() throws Exception { + final String leaderIndexSettings = + getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + final String followerIndexSettings = + getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + + ensureGreen("index1", "index2"); + + final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); + + final int numDocs = randomIntBetween(2, 64); + for (int i = 0; i < numDocs; i++) { + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("field", "value"); + builder.startArray("objects"); + { + builder.startObject(); + builder.field("field", i); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(builder).get(); + } + } + + for (int i = 0; i < numDocs; i++) { + int value = i; + assertBusy(() -> { + final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get(); + assertTrue(getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("field"))); + assertThat(XContentMapValues.extractValue("objects.field", getResponse.getSource()), + equalTo(Collections.singletonList(value))); + }); + } + + final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); + unfollowRequest.setFollowIndex("index2"); + client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(0)); + }); + } + public void testFollowNonExistentIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("test-leader").get()); assertAcked(client().admin().indices().prepareCreate("test-follower").get()); @@ -318,4 +375,55 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfPrimaryShards); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + builder.startObject("mappings"); + { + builder.startObject("doc"); + { + builder.startObject("properties"); + { + builder.startObject("objects"); + { + builder.field("type", "nested"); + builder.startObject("properties"); + { + builder.startObject("field"); + { + builder.field("type", "long"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject("field"); + { + builder.field("type", "keyword"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + settings = BytesReference.bytes(builder).utf8ToString(); + } + return settings; + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index b91c43d74e757..27081f3a1e60c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -9,8 +9,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -20,7 +18,7 @@ import org.mockito.Mockito; import java.util.Arrays; -import java.util.Set; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -33,7 +31,6 @@ public void testGetOperationsBetween() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) - .put("index.translog.generation_threshold_size", new ByteSizeValue(randomIntBetween(8, 64), ByteSizeUnit.KB)) .build(); final IndexService indexService = createIndex("index", settings); @@ -48,28 +45,23 @@ public void testGetOperationsBetween() throws Exception { for (int iter = 0; iter < iters; iter++) { int min = randomIntBetween(0, numWrites - 1); int max = randomIntBetween(min, numWrites - 1); - final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE); - /* - * We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple - * generations) so the best we can assert is that we see the expected operations. - */ - final Set seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet()); - final Set expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet()); + final List seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toList()); + final List expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } // get operations for a range no operations exists: Exception e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE)); - assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" + - (numWrites + 1) +"] found, tracker checkpoint [")); + assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + numWrites + "] and max_seqno [" + + (numWrites + 1) +"] found")); // get operations for a range some operations do not exist: e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE)); - assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" + - (numWrites + 10) +"] found, tracker checkpoint [")); + assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + (numWrites - 10) + "] and max_seqno [" + + (numWrites + 10) +"] found")); } public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {