Skip to content

Commit

Permalink
[RCI] Add NoOpEngine for closed indices (#33903)
Browse files Browse the repository at this point in the history
This commit adds a new NoOpEngine implementation based on the current 
ReadOnlyEngine. This new implementation uses an empty DirectoryReader 
with no segments readers and will always returns 0 docs. The NoOpEngine 
is the default Engine created for IndexShards of closed indices. It expects 
an empty translog when it is instantiated.

Relates to #33888
  • Loading branch information
tlrx authored Sep 26, 2018
1 parent eae5487 commit d2be022
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 6 deletions.
149 changes: 149 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}. This does maintain
* a translog with a deletion policy so that when flushing, no translog is
* retained on disk (setting a retention size and age of 0).
*
* It's also important to notice that this does list the commits of the Store's
* Directory so that the last commit's user data can be read for the historyUUID
* and last committed segment info.
*/
public final class NoOpEngine extends ReadOnlyEngine {

public NoOpEngine(EngineConfig engineConfig) {
super(engineConfig, null, null, true, directoryReader -> directoryReader);
boolean success = false;
try {
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);

// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
final int nbOperations = translog.totalOperations();
if (nbOperations != 0) {
throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations);
}
}
success = true;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this);
}
}
}

@Override
protected DirectoryReader open(final Directory directory) throws IOException {
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
assert indexCommits.size() == 1 : "expected only one commit point";
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
return new DirectoryReader(directory, new LeafReader[0]) {
@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
return null;
}

@Override
public long getVersion() {
return 0;
}

@Override
public boolean isCurrent() throws IOException {
return true;
}

@Override
public IndexCommit getIndexCommit() throws IOException {
return indexCommit;
}

@Override
protected void doClose() throws IOException {
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier());
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("Commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
throw new UnsupportedOperationException("Translog synchronization should never be needed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -56,7 +57,7 @@
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
*/
public final class ReadOnlyEngine extends Engine {
public class ReadOnlyEngine extends Engine {

private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand Down Expand Up @@ -95,15 +96,15 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), config.getShardId());
reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId());
if (config.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.docsStats = docsStats(reader);
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand All @@ -116,6 +117,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected DirectoryReader open(final Directory directory) throws IOException {
return DirectoryReader.open(directory);
}

@Override
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
Expand All @@ -129,6 +134,24 @@ protected void closeNoLock(String reason, CountDownLatch closedLatch) {
}
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
if (lastCommittedSegmentInfos != null) {
for (SegmentCommitInfo segmentCommitInfo : lastCommittedSegmentInfos) {
numDocs += segmentCommitInfo.info.maxDoc() - segmentCommitInfo.getDelCount() - segmentCommitInfo.getSoftDelCount();
numDeletedDocs += segmentCommitInfo.getDelCount() + segmentCommitInfo.getSoftDelCount();
try {
sizeInBytes += segmentCommitInfo.sizeInBytes();
} catch (IOException e) {
throw new UncheckedIOException("Failed to get size for [" + segmentCommitInfo.info.name + "]", e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

public static SeqNoStats buildSeqNoStats(SegmentInfos infos) {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
Expand Down Expand Up @@ -500,6 +501,12 @@ private synchronized IndexService createIndexService(final String reason,
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final IndexMetaData indexMetaData = idxSettings.getIndexMetaData();
if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
// NoOpEngine takes precedence as long as the index is closed
return NoOpEngine::new;
}

final List<Optional<EngineFactory>> engineFactories =
engineFactoryProviders
.stream()
Expand Down
Loading

0 comments on commit d2be022

Please sign in to comment.