Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account soft-deletes in FrozenEngine #51192

Merged
merged 5 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory)) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ReadOnlyEngine extends Engine {
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field if we are reading form memory maps.
*/
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
private static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.AUTO.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand Down Expand Up @@ -528,4 +528,8 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}

protected static DirectoryReader openDirectory(Directory dir) throws IOException {
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(dir, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FrozenEngine(EngineConfig config) {

boolean success = false;
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory)) {
canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()),
config.getShardId());
// we record the segment stats here - that's what the reader needs when it's open and it give the user
Expand Down Expand Up @@ -167,7 +167,7 @@ private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOExc
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
listeners.beforeRefresh();
}
final DirectoryReader dirReader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES);
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory());
reader = lastOpenedReader = wrapReader(dirReader, Function.identity());
processReader(reader);
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.equalTo;

public class FrozenEngineTests extends EngineTestCase {

public void testAcquireReleaseReset() throws IOException {
Expand Down Expand Up @@ -328,4 +330,30 @@ public void testCanMatch() throws IOException {
}
}
}

public void testSearchers() throws Exception {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null,
globalCheckpoint::get, new NoneCircuitBreakerService());
final int totalDocs;
try (InternalEngine engine = createEngine(config)) {
applyOperations(engine, generateHistoryOnReplica(between(10, 1000), false, randomBoolean(), randomBoolean()));
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
engine.syncTranslog();
engine.flush();
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
}
}
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.test.StreamsUtils;
Expand Down Expand Up @@ -595,4 +596,41 @@ private Map<String, Object> getJob(Map<String, Object> jobsMap, String targetJob
}
return null;
}

public void testFrozenIndexAfterRestarted() throws Exception {
final String index = "test_frozen_index";
if (isRunningAgainstOldCluster()) {
Settings.Builder settings = Settings.builder();
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
createIndex(index, settings.build(), mappings);
ensureGreen(index);
int numDocs = randomIntBetween(10, 500);
for (int i = 0; i < numDocs; i++) {
int id = randomIntBetween(0, 100);
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
if (rarely()) {
flush(index, randomBoolean());
}
}
} else {
ensureGreen(index);
final int totalHits = (int) XContentMapValues.extractValue("hits.total.value",
entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
assertOK(client().performRequest(new Request("POST", index + "/_freeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter("ignore_throttled", "false");
assertThat(XContentMapValues.extractValue("hits.total.value", entityAsMap(client().performRequest(request))),
equalTo(totalHits));
assertOK(client().performRequest(new Request("POST", index + "/_unfreeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}
}