diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 1cf18cb4ee5c8..4299fa0cb6ea3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -572,7 +572,11 @@ public CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } - /** get the sequence number service */ + /** + * The sequence number service for this engine. + * + * @return the sequence number service + */ public abstract SequenceNumbersService seqNoService(); /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2d0842ba32e80..39ddc02629b51 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -145,6 +145,12 @@ public class InternalEngine extends Engine { private final String historyUUID; public InternalEngine(EngineConfig engineConfig) { + this(engineConfig, InternalEngine::sequenceNumberService); + } + + InternalEngine( + final EngineConfig engineConfig, + final BiFunction seqNoServiceSupplier) { super(engineConfig); openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { @@ -152,11 +158,11 @@ public InternalEngine(EngineConfig engineConfig) { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() ); this.deletionPolicy = new CombinedDeletionPolicy( - new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); + new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -184,20 +190,20 @@ public InternalEngine(EngineConfig engineConfig) { case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); seqNoStats = new SeqNoStats( - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); break; default: throw new IllegalArgumentException(openMode.toString()); } logger.trace("recovered [{}]", seqNoStats); - seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); + seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; - translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); + translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint()); assert translog.getGeneration() != null; // we can only do this after we generated and committed a translog uuid. other wise the combined // retention policy, which listens to commits, gets all confused. @@ -243,12 +249,12 @@ public InternalEngine(EngineConfig engineConfig) { public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() > localCheckpoint) { - seqNoService().markSeqNoAsCompleted(operation.seqNo()); + seqNoService.markSeqNoAsCompleted(operation.seqNo()); } } } @@ -259,17 +265,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException { public int fillSeqNoGaps(long primaryTerm) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); - final long maxSeqNo = seqNoService().getMaxSeqNo(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final long maxSeqNo = seqNoService.getMaxSeqNo(); int numNoOpsAdded = 0; for ( long seqNo = localCheckpoint + 1; seqNo <= maxSeqNo; - seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { + seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; - assert seqNo <= seqNoService().getLocalCheckpoint() - : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]"; + assert seqNo <= seqNoService.getLocalCheckpoint() + : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]"; } return numNoOpsAdded; @@ -287,15 +293,13 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } - private static SequenceNumbersService sequenceNumberService( - final ShardId shardId, - final String allocationId, - final IndexSettings indexSettings, + static SequenceNumbersService sequenceNumberService( + final EngineConfig engineConfig, final SeqNoStats seqNoStats) { return new SequenceNumbersService( - shardId, - allocationId, - indexSettings, + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), seqNoStats.getGlobalCheckpoint()); @@ -634,8 +638,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no.; seqNo: " + seqNo; + assert assertOriginPrimarySequenceNumber(seqNo); } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -643,6 +646,13 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi return true; } + protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + return true; + } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -652,6 +662,20 @@ private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin return true; } + private long generateSeqNoForOperation(final Operation operation) { + assert operation.origin() == Operation.Origin.PRIMARY; + return doGenerateSeqNoForOperation(operation); + } + + /** + * Generate the sequence number for the specified operation. + * + * @param operation the operation + * @return the sequence number + */ + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoService.generateSeqNo(); + } @Override public IndexResult index(Index index) throws IOException { @@ -721,7 +745,7 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -741,14 +765,12 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio final IndexingStrategy plan; if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one - assert index.version() == 1L : - "can optimize on replicas but incoming version is [" + index.version() + "]"; + assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { // drop out of order operations assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" - + index.versionType() + "]"; + "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity @@ -783,15 +805,14 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { - assert index.origin() == Operation.Origin.PRIMARY : - "planing as primary but origin isn't. got " + index.origin(); + assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { - plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L); + plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); } else { - plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo()); + plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); } } else { // resolves incoming version @@ -812,7 +833,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - seqNoService().generateSeqNo(), + generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); } @@ -1021,7 +1042,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); } deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -1038,8 +1059,7 @@ public DeleteResult delete(Delete delete) throws IOException { } private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { - assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // drop out of order operations assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" @@ -1077,8 +1097,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { - assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); assert incrementVersionLookup(); @@ -1096,9 +1115,10 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally(currentlyDeleted, - seqNoService().generateSeqNo(), - delete.versionType().updateVersion(currentVersion, delete.version())); + plan = DeletionStrategy.processNormally( + currentlyDeleted, + generateSeqNoForOperation(delete), + delete.versionType().updateVersion(currentVersion, delete.version())); } return plan; } @@ -1199,7 +1219,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { return noOpResult; } finally { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); + seqNoService.markSeqNoAsCompleted(seqNo); } } } @@ -1932,13 +1952,13 @@ protected void doRun() throws Exception { * @throws IOException if an I/O exception occurs committing the specfied writer */ protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { - final long localCheckpoint = seqNoService().getLocalCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); - final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); - final String translogUUID = translogGeneration.translogUUID; - final String localCheckpointValue = Long.toString(localCheckpoint); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); + final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); + final String translogUUID = translogGeneration.translogUUID; + final String localCheckpointValue = Long.toString(localCheckpoint); - final Iterable> commitIterable = () -> { + final Iterable> commitIterable = () -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want @@ -1948,19 +1968,19 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(6); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); - if (syncId != null) { - commitData.put(Engine.SYNC_COMMIT_ID, syncId); - } - commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); - commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); - commitData.put(HISTORY_UUID_KEY, historyUUID); - logger.trace("committing writer with commit data [{}]", commitData); - return commitData.entrySet().iterator(); - }; + final Map commitData = new HashMap<>(6); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); + if (syncId != null) { + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + } + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + commitData.put(HISTORY_UUID_KEY, historyUUID); + logger.trace("committing writer with commit data [{}]", commitData); + return commitData.entrySet().iterator(); + }; commitIndexWriter(writer, commitIterable); } @@ -2036,8 +2056,7 @@ public MergeStats getMergeStats() { return mergeScheduler.stats(); } - @Override - public SequenceNumbersService seqNoService() { + public final SequenceNumbersService seqNoService() { return seqNoService; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 1c8911a0cd886..1c0b320558400 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -66,7 +66,7 @@ public SequenceNumbersService( * * @return the next assigned sequence number */ - public long generateSeqNo() { + public final long generateSeqNo() { return localCheckpointTracker.generateSeqNo(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d2b15c0a113d9..f6d2d99658098 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; import org.apache.lucene.document.Field; @@ -95,46 +94,36 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.analysis.AnalyzerScope; -import org.elasticsearch.index.analysis.IndexAnalyzers; -import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.ContentPath; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.indices.IndicesModule; -import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -171,9 +160,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -184,7 +173,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.mapper.SourceToParse.source; import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; @@ -356,11 +344,19 @@ protected InternalEngine createEngine(Store store, Path translogPath) throws IOE return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } - protected InternalEngine createEngine(Store store, Path translogPath, - Function sequenceNumbersServiceSupplier) throws IOException { + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); } + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier, + ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); + } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, null); @@ -377,8 +373,19 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null); + @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null); } protected InternalEngine createEngine( @@ -387,10 +394,11 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation, @Nullable Sort indexSort) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); } @@ -404,21 +412,39 @@ public interface IndexWriterFactory { } public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final Function sequenceNumbersServiceSupplier, + @Nullable final BiFunction sequenceNumbersServiceSupplier, + @Nullable final ToLongBiFunction seqNoForOperation, final EngineConfig config) { - return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } + if (sequenceNumbersServiceSupplier == null) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); + } + }; + } else { + return new InternalEngine(config, sequenceNumbersServiceSupplier) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } @Override - public SequenceNumbersService seqNoService() { - return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService(); + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); } }; + } + } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, @@ -672,8 +698,8 @@ public void testSegmentsWithMergeFlag() throws Exception { public void testSegmentsWithIndexSort() throws Exception { Sort indexSort = new Sort(new SortedSetSortField("_type", false)); try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - null, null, indexSort)) { + Engine engine = + createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, null, indexSort)) { List segments = engine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); @@ -729,13 +755,28 @@ public void testCommitStats() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); try ( Store store = createStore(); - InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - maxSeqNo.get(), - localCheckpoint.get(), - globalCheckpoint.get()) + InternalEngine engine = createEngine(store, createTempDir(), (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getMaxSeqNo() { + return maxSeqNo.get(); + } + + @Override + public long getLocalCheckpoint() { + return localCheckpoint.get(); + } + + @Override + public long getGlobalCheckpoint() { + return globalCheckpoint.get(); + } + } )) { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); @@ -902,20 +943,11 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { Store store = createStore(); final AtomicInteger counter = new AtomicInteger(); try { - initialEngine = createEngine(store, createTempDir(), (config) -> - new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - return seqNos.get(counter.getAndIncrement()); - } - } - ); + initialEngine = createEngine( + store, + createTempDir(), + InternalEngine::sequenceNumberService, + (engine, operation) -> seqNos.get(counter.getAndIncrement())); for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); @@ -2711,7 +2743,7 @@ public void testTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(numDocs, parser.appliedOperations.get()); + assertEquals(numDocs, parser.appliedOperations()); if (parser.mappingUpdate != null) { assertEquals(1, parser.getRecoveredTypes().size()); assertTrue(parser.getRecoveredTypes().containsKey("test")); @@ -2723,7 +2755,7 @@ public void testTranslogReplay() throws IOException { engine = createEngine(store, primaryTranslogDir); assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(0, parser.appliedOperations.get()); + assertEquals(0, parser.appliedOperations()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2753,7 +2785,7 @@ public void testTranslogReplay() throws IOException { assertThat(topDocs.totalHits, equalTo(numDocs + 1L)); } parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(flush ? 1 : 2, parser.appliedOperations.get()); + assertEquals(flush ? 1 : 2, parser.appliedOperations()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); if (randomBoolean()) { engine.refresh("test"); @@ -2767,97 +2799,6 @@ public void testTranslogReplay() throws IOException { } } - public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { - - private final MapperService mapperService; - public Mapping mappingUpdate = null; - private final Map recoveredTypes = new HashMap<>(); - private final AtomicLong appliedOperations = new AtomicLong(); - - public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { - NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); - IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap()); - SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); - MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); - mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, - () -> null); - } - - private DocumentMapperForType docMapper(String type) { - RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); - DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); - return new DocumentMapperForType(b.build(mapperService), mappingUpdate); - } - - private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { - switch (operation.operationType()) { - case INDEX: - Engine.Index engineIndex = (Engine.Index) operation; - Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); - if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { - recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); - } - engine.index(engineIndex); - break; - case DELETE: - engine.delete((Engine.Delete) operation); - break; - case NO_OP: - engine.noOp((Engine.NoOp) operation); - break; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - - /** - * Returns the recovered types modifying the mapping during the recovery - */ - public Map getRecoveredTypes() { - return recoveredTypes; - } - - @Override - public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { - int opsRecovered = 0; - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); - opsRecovered++; - appliedOperations.incrementAndGet(); - } - return opsRecovered; - } - - private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - final String indexName = mapperService.index().getName(); - final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), - mapperService.getIndexSettings().getIndexVersionCreated(), - source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, - index.getAutoGeneratedIdTimestamp(), true); - return engineIndex; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); - return engineDelete; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - final Engine.NoOp engineNoOp = - new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); - return engineNoOp; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - } - public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { @@ -3786,47 +3727,38 @@ public void testSequenceIDs() throws Exception { } /** - * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier - * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the - * value of {@code expectedLocalCheckpoint} is set accordingly. + * A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the + * referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of + * {@code expectedLocalCheckpoint} is set accordingly. * * @param latchReference to latch the thread for the purpose of stalling * @param barrier to signal the thread has generated a new sequence number * @param stall whether or not the thread should stall * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence * number - * @return a sequence number service + * @return a sequence number generator */ - private SequenceNumbersService getStallingSeqNoService( + private ToLongBiFunction getStallingSeqNoGenerator( final AtomicReference latchReference, final CyclicBarrier barrier, final AtomicBoolean stall, final AtomicLong expectedLocalCheckpoint) { - return new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - final long seqNo = super.generateSeqNo(); - final CountDownLatch latch = latchReference.get(); - if (stall.get()) { - try { - barrier.await(); - latch.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - } else { - if (expectedLocalCheckpoint.get() + 1 == seqNo) { - expectedLocalCheckpoint.set(seqNo); - } + return (engine, operation) -> { + final long seqNo = engine.seqNoService().generateSeqNo(); + final CountDownLatch latch = latchReference.get(); + if (stall.get()) { + try { + barrier.await(); + latch.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (expectedLocalCheckpoint.get() + 1 == seqNo) { + expectedLocalCheckpoint.set(seqNo); } - return seqNo; } + return seqNo; }; } @@ -3840,8 +3772,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final List threads = new ArrayList<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + initialEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalInitialEngine = initialEngine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); @@ -4015,17 +3947,17 @@ public void testNoOps() throws IOException { final int localCheckpoint = randomIntBetween(0, maxSeqNo); final int globalCheckpoint = randomIntBetween(0, localCheckpoint); try { - final SequenceNumbersService seqNoService = - new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) { - @Override - public long generateSeqNo() { - throw new UnsupportedOperationException(); - } - }; - noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + final BiFunction supplier = (engineConfig, ignored) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + maxSeqNo, + localCheckpoint, + globalCheckpoint); + noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) { @Override - public SequenceNumbersService seqNoService() { - return seqNoService; + protected long doGenerateSeqNoForOperation(Operation operation) { + throw new UnsupportedOperationException(); } }; noOpEngine.recoverFromTranslog(); @@ -4070,8 +4002,8 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Map threads = new LinkedHashMap<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + actualEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalActualEngine = actualEngine; final Translog translog = finalActualEngine.getTranslog(); final long generation = finalActualEngine.getTranslog().currentFileGeneration(); @@ -4160,26 +4092,20 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException { InternalEngine actualEngine = null; try { final Set completedSeqNos = new HashSet<>(); - final SequenceNumbersService seqNoService = - new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { + final BiFunction supplier = (engineConfig, seqNoStats) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { @Override public void markSeqNoAsCompleted(long seqNo) { super.markSeqNoAsCompleted(seqNo); completedSeqNos.add(seqNo); } }; - actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { - @Override - public SequenceNumbersService seqNoService() { - return seqNoService; - } - }; + actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier); final int operations = randomIntBetween(0, 1024); final Set expectedCompletedSeqNos = new HashSet<>(); for (int i = 0; i < operations; i++) { @@ -4347,4 +4273,71 @@ public void testRefreshScopedSearcher() throws IOException { assertSameReader(getSearcher, searchSearcher); } } + + public void testSeqNoGenerator() throws IOException { + engine.close(); + final long seqNo = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); + final BiFunction seqNoService = (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong seqNoGenerator = new AtomicLong(seqNo); + try (Engine e = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, seqNoService, (engine, operation) -> seqNoGenerator.getAndIncrement())) { + final String id = "id"; + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final Engine.Index index = new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + System.currentTimeMillis(), + randomBoolean()); + final Engine.IndexResult indexResult = e.index(index); + assertThat(indexResult.getSeqNo(), equalTo(seqNo)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 1)); + + final Engine.Delete delete = new Engine.Delete( + type, + id, + new Term("_id", parsedDocument.id()), + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis()); + final Engine.DeleteResult deleteResult = e.delete(delete); + assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 2)); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 1447ea8ae50a9..cf4dab733f237 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -316,7 +316,7 @@ public long addDocument(Iterable doc) throws IOExcepti assert documentFailureMessage != null; throw new IOException(documentFailureMessage); } - }, null, config); + }, null, null, config); } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..844d6b0aaf957 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -637,6 +637,7 @@ public long addDocument(final Iterable doc) throws IOE } }, null, + null, config); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java new file mode 100644 index 0000000000000..6834d124c499a --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.AnalyzerScope; +import org.elasticsearch.index.analysis.IndexAnalyzers; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.DocumentMapperForType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesModule; +import org.elasticsearch.indices.mapper.MapperRegistry; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.index.mapper.SourceToParse.source; + +public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { + + private final MapperService mapperService; + public Mapping mappingUpdate = null; + private final Map recoveredTypes = new HashMap<>(); + + private final AtomicLong appliedOperations = new AtomicLong(); + + long appliedOperations() { + return appliedOperations.get(); + } + + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { + NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); + IndexAnalyzers indexAnalyzers = + new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, emptyMap(), emptyMap()); + SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap()); + MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry(); + mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, + () -> null); + } + + private DocumentMapperForType docMapper(String type) { + RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); + DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); + return new DocumentMapperForType(b.build(mapperService), mappingUpdate); + } + + private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); + if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { + recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); + } + engine.index(engineIndex); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + case NO_OP: + engine.noOp((Engine.NoOp) operation); + break; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + /** + * Returns the recovered types modifying the mapping during the recovery + */ + public Map getRecoveredTypes() { + return recoveredTypes; + } + + @Override + public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + int opsRecovered = 0; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); + opsRecovered++; + appliedOperations.incrementAndGet(); + } + return opsRecovered; + } + + private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + final String indexName = mapperService.index().getName(); + final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), + mapperService.getIndexSettings().getIndexVersionCreated(), + source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, + index.getAutoGeneratedIdTimestamp(), true); + return engineIndex; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime()); + return engineDelete; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + final Engine.NoOp engineNoOp = + new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); + return engineNoOp; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + +}