Skip to content

Commit

Permalink
Relax NoOpEngine constraints (#37413)
Browse files Browse the repository at this point in the history
When a NoOpEngine is instanciated, the current implementation verifies 
that the translog contains no operations and that it contains the same 
UUID as the last Lucene commit data.We can relax those two constraints 
because the Close Index API now ensure that all translog operations are 
flushed before closing a shard. The detection of coherence between translog 
UUID / Lucene commit data is not specific to NoOpEngine, and is already 
done by IndexShard.innerOpenEngineAndTranslog().

Related to #33888
  • Loading branch information
tlrx committed Jan 30, 2019
1 parent 54d110b commit cae4155
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,20 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
import java.util.stream.Stream;
import java.util.function.Function;

/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}. This does maintain
* a translog with a deletion policy so that when flushing, no translog is
* retained on disk (setting a retention size and age of 0).
*
* It's also important to notice that this does list the commits of the Store's
* Directory so that the last commit's user data can be read for the historyUUID
* and last committed segment info.
* index, get), throw {@link UnsupportedOperationException}.
*/
public final class NoOpEngine extends ReadOnlyEngine {

public NoOpEngine(EngineConfig engineConfig) {
super(engineConfig, null, null, true, directoryReader -> directoryReader);
boolean success = false;
try {
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);

// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
final int nbOperations = translog.totalOperations();
if (nbOperations != 0) {
throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations);
}
}
success = true;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this);
}
}
public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
}

@Override
Expand Down Expand Up @@ -121,30 +87,4 @@ public CacheHelper getReaderCacheHelper() {
}
};
}

private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier());
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("Commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
throw new UnsupportedOperationException("Translog synchronization should never be needed");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;

import java.io.IOException;

import static org.elasticsearch.cluster.routing.ShardRoutingHelper.initWithSameId;

public class NoOpEngineRecoveryTests extends IndexShardTestCase {

public void testRecoverFromNoOp() throws IOException {
final int nbDocs = scaledRandomIntBetween(1, 100);

final IndexShard indexShard = newStartedShard(true);
for (int i = 0; i < nbDocs; i++) {
indexDoc(indexShard, "_doc", String.valueOf(i));
}
indexShard.close("test", true);

final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new);
recoverShardFromStore(primary);
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
assertEquals(nbDocs, primary.docStats().getCount());

IndexShard replica = newShard(false, Settings.EMPTY, NoOpEngine::new);
recoverReplica(replica, primary, true);
assertEquals(replica.seqNoStats().getMaxSeqNo(), replica.getMaxSeqNoOfUpdatesOrDeletes());
assertEquals(nbDocs, replica.docStats().getCount());
closeShards(primary, replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -37,8 +35,6 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.test.IndexSettingsModule;

Expand All @@ -50,7 +46,6 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class NoOpEngineTests extends EngineTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
Expand All @@ -59,7 +54,6 @@ public void testNoopEngine() throws IOException {
engine.close();
final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir));
expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null));
expectThrows(UnsupportedOperationException.class, () -> engine.ensureTranslogSynced(null));
assertThat(engine.refreshNeeded(), equalTo(false));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
engine.close();
Expand Down Expand Up @@ -106,63 +100,6 @@ public void testNoopAfterRegularEngine() throws IOException {
noOpEngine.close();
}

public void testNoopEngineWithInvalidTranslogUUID() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 100);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(engine.getLocalCheckpoint());
}
flushAndTrimTranslog(engine);
}

final Path newTranslogDir = createTempDir();
// A new translog will have a different UUID than the existing store/noOp engine does
Translog newTranslog = createTranslog(newTranslogDir, () -> 1L);
newTranslog.close();

EngineCreationFailureException e = expectThrows(EngineCreationFailureException.class,
() -> new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, newTranslogDir)));
assertThat(e.getCause(), instanceOf(TranslogCorruptedException.class));
}
}

public void testNoopEngineWithNonZeroTranslogOperations() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
final MergePolicy mergePolicy = NoMergePolicy.INSTANCE;
EngineConfig config = config(defaultSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 100);
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(engine.getLocalCheckpoint());
}
engine.syncTranslog();
engine.flushAndClose();
engine.close();

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new NoOpEngine(engine.engineConfig));
assertThat(e.getMessage(), is("Expected 0 translog operations but there were " + numDocs));
}
}
}

public void testNoOpEngineDocStats() throws Exception {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down

0 comments on commit cae4155

Please sign in to comment.