From a566942219d8a84deee719adb46860f69499431a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 13:10:20 -0400 Subject: [PATCH] Refactor internal engine This commit is a minor refactoring of internal engine to move hooks for generating sequence numbers into the engine itself. As such, we refactor tests that relied on this hook to use the new hook, and remove the hook from the sequence number service itself. Relates #27082 --- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/InternalEngine.java | 113 +++-- .../index/seqno/SequenceNumbersService.java | 2 +- .../index/engine/InternalEngineTests.java | 401 +++++++++--------- .../IndexLevelReplicationTests.java | 2 +- .../RecoveryDuringReplicationTests.java | 1 + .../index/engine/TranslogHandler.java | 145 +++++++ 7 files changed, 416 insertions(+), 254 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 1cf18cb4ee5c8..4299fa0cb6ea3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -572,7 +572,11 @@ public CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } - /** get the sequence number service */ + /** + * The sequence number service for this engine. + * + * @return the sequence number service + */ public abstract SequenceNumbersService seqNoService(); /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 886ef66225720..76b367dd418d2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -145,6 +145,12 @@ public class InternalEngine extends Engine { private final String historyUUID; public InternalEngine(EngineConfig engineConfig) { + this(engineConfig, InternalEngine::sequenceNumberService); + } + + InternalEngine( + final EngineConfig engineConfig, + final BiFunction seqNoServiceSupplier) { super(engineConfig); openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { @@ -152,11 +158,11 @@ public InternalEngine(EngineConfig engineConfig) { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() ); this.deletionPolicy = new CombinedDeletionPolicy( - new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); + new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -184,20 +190,20 @@ public InternalEngine(EngineConfig engineConfig) { case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); seqNoStats = new SeqNoStats( - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); break; default: throw new IllegalArgumentException(openMode.toString()); } logger.trace("recovered [{}]", seqNoStats); - seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); + seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; - translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); + translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint()); assert translog.getGeneration() != null; this.translog = translog; updateWriterOnOpen(); @@ -240,12 +246,12 @@ public InternalEngine(EngineConfig engineConfig) { public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() > localCheckpoint) { - seqNoService().markSeqNoAsCompleted(operation.seqNo()); + seqNoService.markSeqNoAsCompleted(operation.seqNo()); } } } @@ -256,17 +262,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException { public int fillSeqNoGaps(long primaryTerm) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); - final long maxSeqNo = seqNoService().getMaxSeqNo(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final long maxSeqNo = seqNoService.getMaxSeqNo(); int numNoOpsAdded = 0; for ( long seqNo = localCheckpoint + 1; seqNo <= maxSeqNo; - seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { + seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; - assert seqNo <= seqNoService().getLocalCheckpoint() - : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]"; + assert seqNo <= seqNoService.getLocalCheckpoint() + : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]"; } return numNoOpsAdded; @@ -284,15 +290,13 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } - private static SequenceNumbersService sequenceNumberService( - final ShardId shardId, - final String allocationId, - final IndexSettings indexSettings, + static SequenceNumbersService sequenceNumberService( + final EngineConfig engineConfig, final SeqNoStats seqNoStats) { return new SequenceNumbersService( - shardId, - allocationId, - indexSettings, + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), seqNoStats.getGlobalCheckpoint()); @@ -621,8 +625,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no.; seqNo: " + seqNo; + assert assertOriginPrimarySequenceNumber(seqNo); } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -630,6 +633,13 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi return true; } + protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + return true; + } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -639,6 +649,20 @@ private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin return true; } + private long generateSeqNoForOperation(final Operation operation) { + assert operation.origin() == Operation.Origin.PRIMARY; + return doGenerateSeqNoForOperation(operation); + } + + /** + * Generate the sequence number for the specified operation. + * + * @param operation the operation + * @return the sequence number + */ + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoService.generateSeqNo(); + } @Override public IndexResult index(Index index) throws IOException { @@ -708,7 +732,7 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -728,14 +752,12 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio final IndexingStrategy plan; if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one - assert index.version() == 1L : - "can optimize on replicas but incoming version is [" + index.version() + "]"; + assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { // drop out of order operations assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" - + index.versionType() + "]"; + "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity @@ -770,15 +792,14 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { - assert index.origin() == Operation.Origin.PRIMARY : - "planing as primary but origin isn't. got " + index.origin(); + assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { - plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L); + plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); } else { - plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo()); + plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); } } else { // resolves incoming version @@ -799,7 +820,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - seqNoService().generateSeqNo(), + generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); } @@ -1008,7 +1029,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); } deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -1025,8 +1046,7 @@ public DeleteResult delete(Delete delete) throws IOException { } private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { - assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // drop out of order operations assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" @@ -1064,8 +1084,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { - assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); assert incrementVersionLookup(); @@ -1083,9 +1102,10 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally(currentlyDeleted, - seqNoService().generateSeqNo(), - delete.versionType().updateVersion(currentVersion, delete.version())); + plan = DeletionStrategy.processNormally( + currentlyDeleted, + generateSeqNoForOperation(delete), + delete.versionType().updateVersion(currentVersion, delete.version())); } return plan; } @@ -1186,7 +1206,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { return noOpResult; } finally { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); + seqNoService.markSeqNoAsCompleted(seqNo); } } } @@ -1921,7 +1941,7 @@ protected void doRun() throws Exception { protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { ensureCanFlush(); try { - final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; @@ -1944,7 +1964,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } - commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); logger.trace("committing writer with commit data [{}]", commitData); @@ -2008,8 +2028,7 @@ public MergeStats getMergeStats() { return mergeScheduler.stats(); } - @Override - public SequenceNumbersService seqNoService() { + public final SequenceNumbersService seqNoService() { return seqNoService; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 1c8911a0cd886..1c0b320558400 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -66,7 +66,7 @@ public SequenceNumbersService( * * @return the next assigned sequence number */ - public long generateSeqNo() { + public final long generateSeqNo() { return localCheckpointTracker.generateSeqNo(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d2b15c0a113d9..f6d2d99658098 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; import org.apache.lucene.document.Field; @@ -95,46 +94,36 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.analysis.AnalyzerScope; -import org.elasticsearch.index.analysis.IndexAnalyzers; -import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.ContentPath; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.indices.IndicesModule; -import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -171,9 +160,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -184,7 +173,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.mapper.SourceToParse.source; import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; @@ -356,11 +344,19 @@ protected InternalEngine createEngine(Store store, Path translogPath) throws IOE return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } - protected InternalEngine createEngine(Store store, Path translogPath, - Function sequenceNumbersServiceSupplier) throws IOException { + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); } + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier, + ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); + } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, null); @@ -377,8 +373,19 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null); + @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null); } protected InternalEngine createEngine( @@ -387,10 +394,11 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation, @Nullable Sort indexSort) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); } @@ -404,21 +412,39 @@ public interface IndexWriterFactory { } public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final Function sequenceNumbersServiceSupplier, + @Nullable final BiFunction sequenceNumbersServiceSupplier, + @Nullable final ToLongBiFunction seqNoForOperation, final EngineConfig config) { - return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } + if (sequenceNumbersServiceSupplier == null) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); + } + }; + } else { + return new InternalEngine(config, sequenceNumbersServiceSupplier) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } @Override - public SequenceNumbersService seqNoService() { - return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService(); + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); } }; + } + } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, @@ -672,8 +698,8 @@ public void testSegmentsWithMergeFlag() throws Exception { public void testSegmentsWithIndexSort() throws Exception { Sort indexSort = new Sort(new SortedSetSortField("_type", false)); try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - null, null, indexSort)) { + Engine engine = + createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, null, indexSort)) { List segments = engine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); @@ -729,13 +755,28 @@ public void testCommitStats() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); try ( Store store = createStore(); - InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - maxSeqNo.get(), - localCheckpoint.get(), - globalCheckpoint.get()) + InternalEngine engine = createEngine(store, createTempDir(), (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getMaxSeqNo() { + return maxSeqNo.get(); + } + + @Override + public long getLocalCheckpoint() { + return localCheckpoint.get(); + } + + @Override + public long getGlobalCheckpoint() { + return globalCheckpoint.get(); + } + } )) { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); @@ -902,20 +943,11 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { Store store = createStore(); final AtomicInteger counter = new AtomicInteger(); try { - initialEngine = createEngine(store, createTempDir(), (config) -> - new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - return seqNos.get(counter.getAndIncrement()); - } - } - ); + initialEngine = createEngine( + store, + createTempDir(), + InternalEngine::sequenceNumberService, + (engine, operation) -> seqNos.get(counter.getAndIncrement())); for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); @@ -2711,7 +2743,7 @@ public void testTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(numDocs, parser.appliedOperations.get()); + assertEquals(numDocs, parser.appliedOperations()); if (parser.mappingUpdate != null) { assertEquals(1, parser.getRecoveredTypes().size()); assertTrue(parser.getRecoveredTypes().containsKey("test")); @@ -2723,7 +2755,7 @@ public void testTranslogReplay() throws IOException { engine = createEngine(store, primaryTranslogDir); assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(0, parser.appliedOperations.get()); + assertEquals(0, parser.appliedOperations()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2753,7 +2785,7 @@ public void testTranslogReplay() throws IOException { assertThat(topDocs.totalHits, equalTo(numDocs + 1L)); } parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(flush ? 1 : 2, parser.appliedOperations.get()); + assertEquals(flush ? 1 : 2, parser.appliedOperations()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); if (randomBoolean()) { engine.refresh("test"); @@ -2767,97 +2799,6 @@ public void testTranslogReplay() throws IOException { } } - public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { - - private final MapperService mapperService; - public Mapping mappingUpdate = null; - private final Map recoveredTypes = new HashMap<>(); - private final AtomicLong appliedOperations = new AtomicLong(); - - public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { - NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); - IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap()); - SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); - MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); - mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, - () -> null); - } - - private DocumentMapperForType docMapper(String type) { - RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); - DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); - return new DocumentMapperForType(b.build(mapperService), mappingUpdate); - } - - private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { - switch (operation.operationType()) { - case INDEX: - Engine.Index engineIndex = (Engine.Index) operation; - Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); - if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { - recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); - } - engine.index(engineIndex); - break; - case DELETE: - engine.delete((Engine.Delete) operation); - break; - case NO_OP: - engine.noOp((Engine.NoOp) operation); - break; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - - /** - * Returns the recovered types modifying the mapping during the recovery - */ - public Map getRecoveredTypes() { - return recoveredTypes; - } - - @Override - public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { - int opsRecovered = 0; - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); - opsRecovered++; - appliedOperations.incrementAndGet(); - } - return opsRecovered; - } - - private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - final String indexName = mapperService.index().getName(); - final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), - mapperService.getIndexSettings().getIndexVersionCreated(), - source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, - index.getAutoGeneratedIdTimestamp(), true); - return engineIndex; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); - return engineDelete; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - final Engine.NoOp engineNoOp = - new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); - return engineNoOp; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - } - public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { @@ -3786,47 +3727,38 @@ public void testSequenceIDs() throws Exception { } /** - * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier - * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the - * value of {@code expectedLocalCheckpoint} is set accordingly. + * A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the + * referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of + * {@code expectedLocalCheckpoint} is set accordingly. * * @param latchReference to latch the thread for the purpose of stalling * @param barrier to signal the thread has generated a new sequence number * @param stall whether or not the thread should stall * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence * number - * @return a sequence number service + * @return a sequence number generator */ - private SequenceNumbersService getStallingSeqNoService( + private ToLongBiFunction getStallingSeqNoGenerator( final AtomicReference latchReference, final CyclicBarrier barrier, final AtomicBoolean stall, final AtomicLong expectedLocalCheckpoint) { - return new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - final long seqNo = super.generateSeqNo(); - final CountDownLatch latch = latchReference.get(); - if (stall.get()) { - try { - barrier.await(); - latch.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - } else { - if (expectedLocalCheckpoint.get() + 1 == seqNo) { - expectedLocalCheckpoint.set(seqNo); - } + return (engine, operation) -> { + final long seqNo = engine.seqNoService().generateSeqNo(); + final CountDownLatch latch = latchReference.get(); + if (stall.get()) { + try { + barrier.await(); + latch.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (expectedLocalCheckpoint.get() + 1 == seqNo) { + expectedLocalCheckpoint.set(seqNo); } - return seqNo; } + return seqNo; }; } @@ -3840,8 +3772,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final List threads = new ArrayList<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + initialEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalInitialEngine = initialEngine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); @@ -4015,17 +3947,17 @@ public void testNoOps() throws IOException { final int localCheckpoint = randomIntBetween(0, maxSeqNo); final int globalCheckpoint = randomIntBetween(0, localCheckpoint); try { - final SequenceNumbersService seqNoService = - new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) { - @Override - public long generateSeqNo() { - throw new UnsupportedOperationException(); - } - }; - noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + final BiFunction supplier = (engineConfig, ignored) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + maxSeqNo, + localCheckpoint, + globalCheckpoint); + noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) { @Override - public SequenceNumbersService seqNoService() { - return seqNoService; + protected long doGenerateSeqNoForOperation(Operation operation) { + throw new UnsupportedOperationException(); } }; noOpEngine.recoverFromTranslog(); @@ -4070,8 +4002,8 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Map threads = new LinkedHashMap<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + actualEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalActualEngine = actualEngine; final Translog translog = finalActualEngine.getTranslog(); final long generation = finalActualEngine.getTranslog().currentFileGeneration(); @@ -4160,26 +4092,20 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException { InternalEngine actualEngine = null; try { final Set completedSeqNos = new HashSet<>(); - final SequenceNumbersService seqNoService = - new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { + final BiFunction supplier = (engineConfig, seqNoStats) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { @Override public void markSeqNoAsCompleted(long seqNo) { super.markSeqNoAsCompleted(seqNo); completedSeqNos.add(seqNo); } }; - actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { - @Override - public SequenceNumbersService seqNoService() { - return seqNoService; - } - }; + actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier); final int operations = randomIntBetween(0, 1024); final Set expectedCompletedSeqNos = new HashSet<>(); for (int i = 0; i < operations; i++) { @@ -4347,4 +4273,71 @@ public void testRefreshScopedSearcher() throws IOException { assertSameReader(getSearcher, searchSearcher); } } + + public void testSeqNoGenerator() throws IOException { + engine.close(); + final long seqNo = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); + final BiFunction seqNoService = (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong seqNoGenerator = new AtomicLong(seqNo); + try (Engine e = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, seqNoService, (engine, operation) -> seqNoGenerator.getAndIncrement())) { + final String id = "id"; + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final Engine.Index index = new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + System.currentTimeMillis(), + randomBoolean()); + final Engine.IndexResult indexResult = e.index(index); + assertThat(indexResult.getSeqNo(), equalTo(seqNo)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 1)); + + final Engine.Delete delete = new Engine.Delete( + type, + id, + new Term("_id", parsedDocument.id()), + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis()); + final Engine.DeleteResult deleteResult = e.delete(delete); + assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 2)); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 1447ea8ae50a9..cf4dab733f237 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -316,7 +316,7 @@ public long addDocument(Iterable doc) throws IOExcepti assert documentFailureMessage != null; throw new IOException(documentFailureMessage); } - }, null, config); + }, null, null, config); } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..844d6b0aaf957 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -637,6 +637,7 @@ public long addDocument(final Iterable doc) throws IOE } }, null, + null, config); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java new file mode 100644 index 0000000000000..6834d124c499a --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.AnalyzerScope; +import org.elasticsearch.index.analysis.IndexAnalyzers; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.DocumentMapperForType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesModule; +import org.elasticsearch.indices.mapper.MapperRegistry; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.index.mapper.SourceToParse.source; + +public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { + + private final MapperService mapperService; + public Mapping mappingUpdate = null; + private final Map recoveredTypes = new HashMap<>(); + + private final AtomicLong appliedOperations = new AtomicLong(); + + long appliedOperations() { + return appliedOperations.get(); + } + + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { + NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); + IndexAnalyzers indexAnalyzers = + new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, emptyMap(), emptyMap()); + SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap()); + MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry(); + mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, + () -> null); + } + + private DocumentMapperForType docMapper(String type) { + RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); + DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); + return new DocumentMapperForType(b.build(mapperService), mappingUpdate); + } + + private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); + if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { + recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); + } + engine.index(engineIndex); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + case NO_OP: + engine.noOp((Engine.NoOp) operation); + break; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + /** + * Returns the recovered types modifying the mapping during the recovery + */ + public Map getRecoveredTypes() { + return recoveredTypes; + } + + @Override + public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + int opsRecovered = 0; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); + opsRecovered++; + appliedOperations.incrementAndGet(); + } + return opsRecovered; + } + + private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + final String indexName = mapperService.index().getName(); + final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), + mapperService.getIndexSettings().getIndexVersionCreated(), + source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, + index.getAutoGeneratedIdTimestamp(), true); + return engineIndex; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime()); + return engineDelete; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + final Engine.NoOp engineNoOp = + new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); + return engineNoOp; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + +}