Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Read changes from Lucene instead of translog #30120

Merged
merged 24 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
Expand Down Expand Up @@ -839,4 +840,57 @@ public int length() {
public static NumericDocValuesField newSoftDeleteField() {
return new NumericDocValuesField(SOFT_DELETE_FIELD, 1);
}

/**
* Wraps the provided {@link DirectoryReader} and return a new {@link DirectoryReader} instance that ignores
* deleted documents.
*/
public static DirectoryReader ignoreDeletes(DirectoryReader in) throws IOException {
return new NoDeletesIndexReader(in);
}

private static final class NoDeletesIndexReader extends FilterDirectoryReader {

private static final class NoDeletesIndexReaderWrapper extends SubReaderWrapper {
@Override
public LeafReader wrap(LeafReader in) {
return new FilterLeafReader(in) {
@Override
public CacheHelper getCoreCacheHelper() {
return null;
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}

@Override
public int numDocs() {
return maxDoc();
}

@Override
public Bits getLiveDocs() {
return null;
}
};
}
}

private NoDeletesIndexReader(DirectoryReader in) throws IOException {
super(in, new NoDeletesIndexReaderWrapper());
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new NoDeletesIndexReader(in);
}

@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,6 +44,9 @@
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand All @@ -53,6 +58,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;

public class LuceneTests extends ESTestCase {
public void testWaitForIndex() throws Exception {
final MockDirectoryWrapper dir = newMockDirectory();
Expand Down Expand Up @@ -406,4 +413,29 @@ public void testMMapHackSupported() throws Exception {
// add assume's here if needed for certain platforms, but we should know if it does not work.
assertTrue("MMapDirectory does not support unmapping: " + MMapDirectory.UNMAP_NOT_SUPPORTED_REASON, MMapDirectory.UNMAP_SUPPORTED);
}

public void testIgnoreDeletes() throws IOException {
try (Directory directory = newDirectory()) {
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory, indexWriterConfig)) {
int numDocs = randomIntBetween(0, 32);
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
document.add(new StringField("field", Integer.toString(i), Store.NO));

iw.addDocument(document);
if (randomBoolean()) {
iw.deleteDocuments(new Term("field", Integer.toString(i)));
}
}
try(DirectoryReader ir = DirectoryReader.open(iw.w)) {
DirectoryReader noDeletesReader = Lucene.ignoreDeletes(ir);
assertThat(noDeletesReader.numDocs(), equalTo(noDeletesReader.maxDoc()));
assertThat(noDeletesReader.numDeletedDocs(), equalTo(0));
}
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public void testFollowIndex() throws Exception {
final String leaderIndexName = "test_index1";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
Settings indexSettings = Settings.builder()
.put("index.soft_deletes", true)
.build();
createIndex(leaderIndexName, indexSettings);
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(client(), leaderIndexName, Integer.toString(i), "field", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
Expand Down Expand Up @@ -205,73 +206,78 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
if (leaderIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
return;
}

if (followIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
return;
}

if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
// TODO: other validation checks
} else {
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}
validate(leaderIndexMetadata, followIndexMetadata, request);

final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}

@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}

void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}

if (error == null) {
// include task ids?
handler.onResponse(new Response());
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
if (error == null) {
// include task ids?
handler.onResponse(new Response());
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
}
}
);
}
});
}
}
}

static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}

if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
}

if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
}

if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
}
// TODO: other validation checks
}

}
Loading