Skip to content

Commit

Permalink
Fix doc_stats and segment_stats of ReadOnlyEngine (#53345)
Browse files Browse the repository at this point in the history
We can't always have the same segment stats and doc stats between 
InternalEngine and ReadOnlyEngine if there are some fully deleted 
segments. ReadOnlyEngine always filters out them. InternalEngine,
however, will keep them if peer recovery retention leases exist or the
number of the retaining operations is non-zero.

This change reverts the fix in #51331 and uses the wrapped reader to 
calculate the segment stats and doc stats. For the test, we need to
disable the extra retaining soft-deletes operations.

Closes #51303
  • Loading branch information
dnhatn authored Mar 10, 2020
1 parent 2b96a6b commit 352e59c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand All @@ -49,18 +50,19 @@
*/
public final class NoOpEngine extends ReadOnlyEngine {

private final SegmentsStats stats;
private final SegmentsStats segmentsStats;
private final DocsStats docsStats;

public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
this.segmentsStats = new SegmentsStats();
Directory directory = store.directory();
// Do not wrap soft-deletes reader when calculating segment stats as the wrapper might filter out fully deleted segments.
try (DirectoryReader reader = openDirectory(directory, false)) {
try (DirectoryReader reader = openDirectory(directory)) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
fillSegmentStats(segmentReader, true, segmentsStats);
}
this.docsStats = docsStats(reader);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -117,7 +119,7 @@ public CacheHelper getReaderCacheHelper() {
public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
if (includeUnloadedSegments) {
final SegmentsStats stats = new SegmentsStats();
stats.add(this.stats);
stats.add(this.segmentsStats);
if (includeSegmentFileSizes == false) {
stats.clearFileSizes();
}
Expand All @@ -127,6 +129,11 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
}
}

@Override
public DocsStats docStats() {
return docsStats;
}

/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
Expand All @@ -36,7 +35,6 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -76,7 +74,6 @@ public class ReadOnlyEngine extends Engine {
private final ElasticsearchReaderManager readerManager;
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;
Expand Down Expand Up @@ -119,7 +116,6 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
reader = wrapReader(open(indexCommit), readerWrapperFunction);
readerManager = new ElasticsearchReaderManager(reader, refreshListener);
this.docsStats = docsStats(lastCommittedSegmentInfos);
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
Expand Down Expand Up @@ -182,24 +178,6 @@ protected DirectoryReader open(IndexCommit commit) throws IOException {
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD);
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
if (lastCommittedSegmentInfos != null) {
for (SegmentCommitInfo segmentCommitInfo : lastCommittedSegmentInfos) {
numDocs += segmentCommitInfo.info.maxDoc() - segmentCommitInfo.getDelCount() - segmentCommitInfo.getSoftDelCount();
numDeletedDocs += segmentCommitInfo.getDelCount() + segmentCommitInfo.getSoftDelCount();
try {
sizeInBytes += segmentCommitInfo.sizeInBytes();
} catch (IOException e) {
throw new UncheckedIOException("Failed to get size for [" + segmentCommitInfo.info.name + "]", e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

@Override
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
Expand Down Expand Up @@ -463,11 +441,6 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
public void maybePruneDeletes() {
}

@Override
public DocsStats docStats() {
return docsStats;
}

@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {

Expand Down Expand Up @@ -511,13 +484,9 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}

protected static DirectoryReader openDirectory(Directory directory, boolean wrapSoftDeletes) throws IOException {
protected static DirectoryReader openDirectory(Directory directory) throws IOException {
final DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES);
if (wrapSoftDeletes) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
} else {
return reader;
}
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -101,10 +102,16 @@ public void testNoopAfterRegularEngine() throws IOException {

public void testNoOpEngineStats() throws Exception {
IOUtils.close(engine, store);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());

final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
Path translogPath = createTempDir();
EngineConfig config = config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final int numDocs = scaledRandomIntBetween(10, 3000);
int deletions = 0;
try (InternalEngine engine = createEngine(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -69,7 +70,8 @@
public final class FrozenEngine extends ReadOnlyEngine {
public static final Setting<Boolean> INDEX_FROZEN = Setting.boolSetting("index.frozen", false, Setting.Property.IndexScope,
Setting.Property.PrivateIndex);
private final SegmentsStats stats;
private final SegmentsStats segmentsStats;
private final DocsStats docsStats;
private volatile ElasticsearchDirectoryReader lastOpenedReader;
private final ElasticsearchDirectoryReader canMatchReader;

Expand All @@ -78,15 +80,15 @@ public FrozenEngine(EngineConfig config) {

boolean success = false;
Directory directory = store.directory();
// Do not wrap soft-deletes reader when calculating segment stats as the wrapper might filter out fully deleted segments.
try (DirectoryReader reader = openDirectory(directory, false)) {
// we record the segment stats here - that's what the reader needs when it's open and it give the user
try (DirectoryReader reader = openDirectory(directory)) {
// we record the segment stats and doc stats here - that's what the reader needs when it's open and it give the user
// an idea of what it can save when it's closed
this.stats = new SegmentsStats();
this.segmentsStats = new SegmentsStats();
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
fillSegmentStats(segmentReader, true, segmentsStats);
}
this.docsStats = docsStats(reader);
final DirectoryReader wrappedReader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
canMatchReader = ElasticsearchDirectoryReader.wrap(
new RewriteCachingDirectoryReader(directory, wrappedReader.leaves()), config.getShardId());
Expand Down Expand Up @@ -170,7 +172,7 @@ private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOExc
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
listeners.beforeRefresh();
}
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory(), true);
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory());
reader = lastOpenedReader = wrapReader(dirReader, Function.identity());
processReader(reader);
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
Expand Down Expand Up @@ -594,7 +596,7 @@ public LeafReader getDelegate() {
public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
if (includeUnloadedSegments) {
final SegmentsStats stats = new SegmentsStats();
stats.add(this.stats);
stats.add(this.segmentsStats);
if (includeSegmentFileSizes == false) {
stats.clearFileSizes();
}
Expand All @@ -605,6 +607,11 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl

}

@Override
public DocsStats docStats() {
return docsStats;
}

synchronized boolean isReaderOpen() {
return lastOpenedReader != null;
} // this is mainly for tests
Expand Down

0 comments on commit 352e59c

Please sign in to comment.