Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill LocalCheckpointTracker with Lucene commit #34474

Merged
merged 13 commits into from
Oct 19, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -29,14 +30,20 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -102,6 +109,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -189,7 +197,6 @@ public InternalEngine(EngineConfig engineConfig) {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
Expand Down Expand Up @@ -223,6 +230,8 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
success = true;
Expand All @@ -238,16 +247,46 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// scans existing sequence numbers in Lucene commit, then marks them as completed in the tracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
try (Searcher engineSearcher = searcherSupplier.get()) {
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, localCheckpoint + 1, maxSeqNo);
for (LeafReaderContext leaf : reader.leaves()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes Let me know if you still prefer using the "Snapshot" API.

final Scorer scorer = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f).scorer(leaf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create the weight once for all leaves and reuse it to build the Scorer ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jimczi. I pushed 4255bee

if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This takes another approach than LuceneChangesSnapshot. @jimczi @s1monw can you please double check this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to use the reader's PointsValue directly to speed up the search since they are always indexed.
Something like the following untested snippet:

void markAsCompleted(IndexReader reader, LocalCheckpointTracker tracker, long minSeqNo, long maxSeqNo) throws IOException {
        for (LeafReaderContext ctx : reader.leaves()) {
            PointValues values = ctx.reader().getPointValues(SeqNoFieldMapper.NAME);
            byte[] lowerPoint = new byte[values.getBytesPerDimension()];
            LongPoint.encodeDimension(minSeqNo, lowerPoint, 0);
            byte[] upperPoint = new byte[values.getBytesPerDimension()];
            LongPoint.encodeDimension(maxSeqNo, upperPoint, 0);
            final Bits liveDocs = ctx.reader().getLiveDocs() == null ?
                new Bits.MatchAllBits(reader.maxDoc()) : ctx.reader().getLiveDocs();
            int numBytes = values.getBytesPerDimension();
            values.intersect(new PointValues.IntersectVisitor() {
                @Override
                public void visit(int docID) throws IOException {
                    throw new IllegalStateException("should never be called");
                }

                @Override
                public void visit(int docID, byte[] packedValue) throws IOException {
                    if (liveDocs.get(docID) == false) {
                        return;
                    }

                    if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
                        // Doc's value is too low, in this dimension
                        return;
                    }
                    if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, upperPoint, 0, numBytes) > 0) {
                        // Doc's value is too high, in this dimension
                        return;
                    }
                    long seqNo = LongPoint.decodeDimension(packedValue, 0);
                    tracker.markSeqNoAsCompleted(seqNo);
                }

                @Override
                public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
                    if (FutureArrays.compareUnsigned(minPackedValue, 0, numBytes, upperPoint, 0, numBytes) > 0 ||
                            FutureArrays.compareUnsigned(maxPackedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
                        return PointValues.Relation.CELL_OUTSIDE_QUERY;
                    }
                    return PointValues.Relation.CELL_CROSSES_QUERY;
                }
            });

        }
    }

This would avoid the need to execute a query to read from doc values but I am not sure if it's worth the complexity. If only few documents should be visited this is probably useless and using a query is the right approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pull this in, and it works perfectly. However, I am not sure if we should use it either. Normally we would expect to visit a few to several hundred documents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is probably a premature optimization. I wrote it mainly to see how it looks but this shouldn't bring much if the number of documents to visit is low. The query+doc_values approach seems easier to maintain.

if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("invalid seq_no doc_values for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert localCheckpoint < seqNo && seqNo <= maxSeqNo :
"local_checkpoint=" + localCheckpoint + " seq_no=" + seqNo + " max_seq_no=" + maxSeqNo;
tracker.markSeqNoAsCompleted(seqNo);
}
}
}
}
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
}
}

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5081,6 +5081,77 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
}
}

public void testRebuildLocalCheckpointTracker() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path translogPath = createTempDir();
int numOps = scaledRandomIntBetween(1, 500);
List<Engine.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
long seqNo = i;
final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
}
}
Randomness.shuffle(operations);
List<List<Engine.Operation>> commits = new ArrayList<>();
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
flushedOperations.add(op);
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
} else {
engine.noOp((Engine.NoOp) op);
}
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (randomInt(100) < 5) {
engine.flush();
commits.add(new ArrayList<>(flushedOperations));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
trimUnsafeCommits(config);
List<Engine.Operation> safeCommit = null;
for (int i = commits.size() - 1; i >= 0; i--) {
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
safeCommit = commits.get(i);
break;
}
}
assertThat(safeCommit, notNullValue());
try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
for (Engine.Operation op : operations) {
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
}
}
}
}

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,33 @@ public void testRetryBulkShardOperations() throws Exception {
}
}

public void testOutOfOrderOnFollower() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not about out of order but rather over recovery of ops you already have. I checked and I can't find any existing tests that cover recovery (which can cover this case). I think we should have some (i.e., rename this test and extend it to cover general peer recovery (with and without holes). We also need concurrent recovery + indexing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll do.

try (ReplicationGroup leaderGroup = createGroup(0);
ReplicationGroup followerGroup = createFollowGroup(0)) {
leaderGroup.startAll();
followerGroup.startAll();
leaderGroup.indexDocs(3); // doc#1, doc#2, doc#3
IndexShard leadingPrimary = leaderGroup.getPrimary();
Translog.Operation[] operations = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(),
0, 3, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES));
// replicate doc#1 and doc#3 and flush
BulkShardOperationsRequest firstBulk = new BulkShardOperationsRequest(followerGroup.getPrimary().shardId(),
followerGroup.getPrimary().getHistoryUUID(), Arrays.asList(operations[0], operations[2]),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes());
new CCRAction(firstBulk, new PlainActionFuture<>(), followerGroup).execute();
followerGroup.flush();
// replicate doc#2
BulkShardOperationsRequest secondBulk = new BulkShardOperationsRequest(followerGroup.getPrimary().shardId(),
followerGroup.getPrimary().getHistoryUUID(), Arrays.asList(operations[1]), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes());
new CCRAction(secondBulk, new PlainActionFuture<>(), followerGroup).execute();
followerGroup.syncGlobalCheckpoint();
// add a new replica
IndexShard newReplica = followerGroup.addReplica();
followerGroup.recoverReplica(newReplica);
followerGroup.assertAllEqual(3);
}
}

@Override
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Expand All @@ -305,6 +332,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
private ReplicationGroup createFollowGroup(int replicas) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); // follower requires soft-deletes
return createGroup(replicas, settingsBuilder.build());
}

Expand Down